I'm using async-await in .Net. How can I limit the number of concurrent asynchronous calls?
One relatively simple way is to (ab)use TPL Dataflow. Something like:
public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
int maxDegreeOfParallelism)
{
var outputs = new ConcurrentQueue<TOutput>();
var block = new ActionBlock<TInput>(
async x => outputs.Enqueue(await asyncFunction(x)),
new ExecutionDataflowBlockOptions
{ MaxDgreeOfParallelism = maxDegreeOfParallelism });
foreach (var input in inputs)
block.Send(input);
block.Complete();
block.Completion.Wait();
return outputs.ToArray();
}
Parallel.ForEach()
doesn't support async
. You want to start a new Task
only after a previous one completed and the only way to achieve that with Parallel.ForEach()
would be if you explicitly called Wait()
(which is obviously undesirable). –
Offshore bitmaps
collection. And async
/await
doesn't do what you want by itself, so you need something. That something may as well be TPL Dataflow. If you want to perform different actions, you could use delegates: new ActionBlock<Func<Task>>(async f => await f())
. –
Offshore TaskCompletionSource
, because you need to return some Task
early, even before the delegate is executed. –
Offshore TaskCompleteSource
requirement when I realized that I need to return a task before I even start one! I thought that I could chain the results of WhenAny
but it's too difficult to get the correct list input, which is changing over time, and all those continuations are inelegant anyway. I added the solution below. –
Roshelle Note: I leave this here for legacy. Don't do it this way because there will be too many tasks waiting on the WhenAny
simultaneously. And the stack will get deep.
Based on this code by Stephen Toub:
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
I wrote this:
Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
If Not ThrottleGroups.ContainsKey(GroupId) Then
ThrottleGroups.Add(GroupId, New List(Of Task))
End If
If ThrottleGroups(GroupId).Count < MaxCount Then
Dim NewTask As Task(Of TResult) = f()
ThrottleGroups(GroupId).Add(NewTask)
Return Await NewTask
Else
Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
ThrottleGroups(GroupId).Remove(FinishedTask)
Return Await ThrottleAsync(f, GroupId, MaxCount)
End If
End Function
To use, just replace:
ExampleTaskAsync(param1, param2)
with:
Dim f As Func(Of Task(Of Integer))
f = Function()
Return ExampleAsync(param1, param2)
End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)
Notice that we have to wrap the call to the task in a function f
because otherwise we would be already starting the Task. The second parameter to ThrottleAsync is any object that identifies the "group"; I used a string. All asynchronous tasks in the same "group" are limited to CONCURRENT_TASKS
tasks, in this case 4.
Here's sample code that show how only four threads run at a time. All Ready!
displays immediately because the subroutine is asynchronous. Also, even if the threads start or end out of order, the "output" lines will still be in the same order as the input.
Dim results As New List(Of Task(Of Integer))
For i As Integer = 0 To 20
Dim j As Integer = i
Dim f As Func(Of Task(Of Integer))
f = Function() As Task(Of Integer)
Return Task.Run(Function() As Integer
Debug.WriteLine(DateTime.Now & "Starting " & j)
System.Threading.Thread.Sleep(5000)
Debug.WriteLine(DateTime.Now & "Ending " & j)
Return j
End Function)
End Function
Const CONCURRENT_UPLOADS As Integer = 4
results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
Next
Debug.WriteLine("all ready!")
For Each x As Task(Of Integer) In results
Debug.WriteLine(DateTime.Now & "Output: " & Await x)
Next
Remove()
could be called from several threads at the same time. I think you should use ConcurrentDictionary
instead. –
Offshore Depending on the code, the simplest approach might be using Parallel.For(Each) and specify the max parallelism in the parallel options.
Parallel.For(Each)
doesn't work well with async
methods. –
Offshore I like this technique better. I'm using TaskCompletionSource
to create output tasks for the incoming tasks. This is necessary because I want to return a Task
before I even run it! The class below associates each input Func(of Task(of Object))
with a TaskCompletionSource
which is returned immediately and puts them into a queue.
Elements from the queue are dequeued into a list of running tasks and a continuation sets the TaskCompletionSource
. An invocation to WhenAny
in a loop makes sure to move elements from the queue to the running list when room frees up. There's also a check to make sure that there isn't more than one WhenAny
at a time, though it might have concurrency issues.
To use, just replace synchronous functions like this:
Task.Run(AddressOf MySyncFunction) 'possibly many of these
with this:
Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.
For functions which already return a Task, it's important to convert those into functions that return Task so that the thottler can run them. Replace:
NewTask = MyFunctionAsync()
with:
NewTask = t1.Run(Function () return MyFunctionAsync())
The class below also implements many different signatures for Throttler.Run() depending on whether the function is sync/async, has/hasn't input, has/hasn't output. Converting Task to Task(Of Output) is especially tricky!
Class Throttler
Property MaxCount As Integer
Sub New(Optional MaxCount As Integer = 1)
Me.MaxCount = MaxCount
End Sub
Private Running As New List(Of Task)
Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
Private AlreadyWaiting As Boolean
Async Sub MakeWaiter()
If AlreadyWaiting Then Exit Sub
AlreadyWaiting = True
Do While Waiting.Count > 0
Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
Dim NewTask As Task(Of Object) = NewFunc()
Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
NewTask.ContinueWith(Sub(t2 As Task(Of Object))
CurrentTcs.SetResult(t2.Result)
End Sub)
Running.Add(NewTask)
Loop
If Waiting.Count > 0 Then
Dim Waiter As Task(Of Task)
Waiter = Task.WhenAny(Running)
Dim FinishedTask As Task = Await Waiter
Await FinishedTask
Running.Remove(FinishedTask)
End If
Loop
AlreadyWaiting = False
End Sub
Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
Dim NewTcs As New TaskCompletionSource(Of Object)
Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
MakeWaiter()
Return NewTcs.Task
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return f(input)
End Function
Return Me.Run(NewF)
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f(input)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Func(Of Task)) As Task
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f().ContinueWith(Function(t As task) As Object
Return Nothing
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function() As Object
Return f(input)
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f(input)
End Sub)
End Function
Return Me.Run(NewF)
End Function
Function Run(f As Func(Of Object)) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function()
Return f()
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Action) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f()
End Sub)
End Function
Return Me.Run(NewF)
End Function
End Class
MakeWaiter()
could run more than once at the same time, because you're not using AlreadyWaiting
atomically. –
Offshore ActionBlock
is that I can't get it to run in the correct context. If it runs in the wrong context, UI manipulation in the task throws an exception. If it runs in the UI context, the call to Task.Wait halts the UI. I need for the ActionBlock to run in a new context but the original Task to run in the original context. I don't see how. –
Roshelle Wait()
in an application that uses await
, but wouldn't setting TaskScheduler
of the block do the trick? –
Offshore © 2022 - 2024 — McMap. All rights reserved.
await
? There is only one asynchronous call at at a time. e.g.await SomethingAsync(); await SomethingElseAsync();
:SomethingElseAsync
won't be called untilSomethingAsync
completes. – Merrick