Throttling asynchronous tasks
Asked Answered
V

3

69

I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

I also want to utilize a given number of threads if possible.

I came up with an extension method, ThrottleTasksAsync that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.

Usage:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

Here is the code:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

The method utilizes BlockingCollection and SemaphoreSlim to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach loop re-purposed as a while loop.

The old version was:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

But, the thread pool gets exhausted fast, and you can't do async/await.

Bonus: To get around the problem in BlockingCollection where an exception is thrown in Take() when CompleteAdding() is called, I'm using the TryTake overload with a timeout. If I didn't use the timeout in TryTake, it would defeat the purpose of using a BlockingCollection since TryTake won't block. Is there a better way? Ideally, there would be a TakeAsync method.

Vitovitoria answered 18/3, 2014 at 22:28 Comment(8)
Is there a better way? yes, TPL Dataflow.Abruption
For the url example you can put all urls in a ConcurrentBag, start 50 threads and in each thread get a url and perform a request until the bag is empty.Watchmaker
For general case use a ConcurrentBag of delegates :)Watchmaker
@Watchmaker I'll be doing thousands of requests though, and I want to run them all on the same thread using await. The Parallel.ForEach achieves the effect of 2 or 4 concurrent while loops.Vitovitoria
@Scott Chamberlain What specific use of TPL Dataflow would improve my situation?Vitovitoria
I asked a similar question (#21170423). Dataflow and Rx seem to be the most interesting candidates. Meanwhile, I have tested Dataflow and it works nicely.Hydrotaxis
Related: How to limit the amount of concurrent async I/O operations?Marcum
see also throttling-asynchronous-methods-in-csharp or throttling-async-taskWoeful
A
66

As suggested, use TPL Dataflow.

A TransformBlock<TInput, TOutput> may be what you're looking for.

You define a MaxDegreeOfParallelism to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

Note: The TransformBlock buffers both its input and output. Why, then, do we need to link it to a BufferBlock?

Because the TransformBlock won't complete until all items (HttpResponse) have been consumed, and await downloader.Completion would hang. Instead, we let the downloader forward all its output to a dedicated buffer block - then we wait for the downloader to complete, and inspect the buffer block.

Annalist answered 18/3, 2014 at 22:51 Comment(12)
+1 What an elegant solution. So less code, so much functionality.Screen
Hopefully my code at least illustrated the point :) In my case, the parallelism would probably be low, but many async tasks would be posted. How do I throttle the async tasks using this model?Vitovitoria
@JoshWyant You mean limit how many urls can be downloaded at once? Using MaxDegreeOfParallelismAnnalist
say you have 2 cores. Instead of creating 50 threads, set MaxDegreeOfParallelism to 2. On each core, you want to open 25 async http requests. When one request completes, there are 49 requests pending on one of the cores. await SendAsync() will allow you to post one more Task<HttpResponse>, bringing the pending number of requests back up to 50?Vitovitoria
@JoshWyant With the code above, you just post as many urls as you want (using SendAsync). Theese will be buffered by the block. The block will keep taking urls from the buffer and process never more than 50 at a time. The results will then be put in another buffer. A TransformBlock buffers both its input and output.Annalist
What I'm saying is I want my degree of parallelism (# of concurrent operations) to be different than the number of Tasks pending completion. I can set the degree of parallelism to 1 and still spawn 1000 HTTP Requests using async/await. In my example code, the first argument would be 1000 and the second would be 1.Vitovitoria
@JoshWyant by the way, you can't assign a thread to a core in C# (well, at least not in an easy way). That's called thread-affinity, btw. Generally, you'd want to have no more threads than cores. So with 4 cores, you'd have at most 4 threads. But since this doesn't seem to be async I/O, instead of CPU-bound work, I think it should be fine to use more than that. Try it out, see what works best.Annalist
@JoshWyant I'me not sure I follow your last comment... Your question says you want to limit the number of downloads happening concurrently.. Do you have any other requirements?Annalist
@Annalist Here's what's happening. We're processing millions of objects. Each unit of work is several hundred objects. For each object, there's a GET API call, a POST, and a db call, and each will block. With Parallel.ForEach max degree = 15 (most efficient, uses 15 threads), we're getting a throughput of 13.5 objects per second. I have a feeling that making requests using Async/Await, we can achieve a much greater throughput, since the I/O will not be blocking and we'll be limiting the number of threads. I just want to throttle it for memory requirements and to minimize timeouts.Vitovitoria
@JoshWyant If I followed correctly, I would do this: Have a TransformBlock that performs both GET and POST requests sequentially with a limit of, say, 15. This way, you'd have 15 requests being performed at once, at most (e.g., 10 GETs + 5 POSTs). Then, I'd link that to another ActionBlock that will take the previous output and store it on the database. Does that sound right to you? Also, make sure you're using the proper async APIs to achieve true async I/O.Annalist
Also, take a look at the Introduction to TPL Dataflow, great stuff.Annalist
@Annalist So, I ultimately went with the Dataflow solution. My initial fear was that MaxDegreeOfParallelism worked exactly like Parallel.ForEach, just creating an arbitrary amount of threads to achieve parallelism. I was wrong, and the parameter works very nicely with async. Tpl.Dataflow works beautifully. Thanks!Vitovitoria
C
65

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

The following simple solution has surfaced many times here on SO. It doesn't use blocking code and doesn't create threads explicitly, so it scales very well:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

The thing is, the processing of the downloaded data should be done on a different pipeline, with a different level of parallelism, especially if it's a CPU-bound processing.

E.g., you'd probably want to have 4 threads concurrently doing the data processing (the number of CPU cores), and up to 50 pending requests for more data (which do not use threads at all). AFAICT, this is not what your code is currently doing.

That's where TPL Dataflow or Rx may come in handy as a preferred solution. Yet it is certainly possible to implement something like this with plain TPL. Note, the only blocking code here is the one doing the actual data processing inside Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
Crelin answered 19/3, 2014 at 0:6 Comment(9)
This is the simplest and most straightforward answer. It's pretty much exactly what I was trying to do. My mistake was trying to run the semaphore on a separate thread, but this makes it so much simpler, and eliminates the BlockingCollection. I just didn't realize I could use WaitAsync that way. Thank you @Noseratio.Vitovitoria
@JoshWyant, no problem. I believe this is pretty much what TPL Dataflow would do behind the scene, if its pipeline is properly designed and assembled. It's just that I lack enough TPL Dataflow skills, but I'm going to invest more time into it.Crelin
You're right. Once you understand it, TPL Dataflow works beautifully. It handles the problem of distributing the async work to multiple cores, which was my other goal. This answer addresses my first goal, and Dataflow addresses them both. @NoseratioVitovitoria
Be careful if you test this with the default new HttpClient() in .net core when hitting the same endpoint. By default it limits the number of connections per server (saw that in fiddler where it was limiting it to 2), unless you specify new HttpClient(new HttpClientHandler { MaxConnectionsPerServer = ... }). Everything in this answer works as advertised, but you can still be limited by that setting.Gestate
I don't not understand... Do you recommend or discourage to use simple semaphore solution? And if yes, is it ok to use Slim version? Documentation states it's suitable only for very short waiting.Proteus
@SerG, this post was meant to show how it could be done at the low-level. I'd still recommend using TPL Dataflow if your project time/dependency constraints allow that. What part of the docs says SemaphoreSlim is only suitable for short waits?Crelin
Here It's like "slim" means CPU-consuming SpinLockProteus
They say in the docs: The SemaphoreSlim class represents a lightweight, fast semaphore that can be used for waiting within a single process when wait times are expected to be very short. I'd say, it means SemaphoreSlim is better optimized for short waits than the original Semaphore (which always wraps a Win32 semaphore object). It does not mean though SemaphoreSlim is only suitable for short waits and should be avoided for long waits, IMO.Crelin
Semaphore is simpler and rookie friendly than TPL dataflow :-)Woeful
V
5

As requested, here's the code I ended up going with.

The work is set up in a master-detail configuration, and each master is processed as a batch. Each unit of work is queued up in this fashion:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

Masters are buffered one at a time to save work for other outside processes. The details for each master are dispatched for work via the masterTransform TransformManyBlock. A BatchedJoinBlock is also created to collect the details in one batch.

The actual work is done in the detailTransform TransformBlock, asynchronously, 150 at a time. BoundedCapacity is set to 300 to ensure that too many Masters don't get buffered at the beginning of the chain, while also leaving room for enough detail records to be queued to allow 150 records to be processed at one time. The block outputs an object to its targets, because it's filtered across the links depending on whether it's a Detail or Exception.

The batchAction ActionBlock collects the output from all the batches, and performs bulk database updates, error logging, etc. for each batch.

There will be several BatchedJoinBlocks, one for each master. Since each ISourceBlock is output sequentially and each batch only accepts the number of detail records associated with one master, the batches will be processed in order. Each block only outputs one group, and is unlinked on completion. Only the last batch block propagates its completion to the final ActionBlock.

The dataflow network:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
Vitovitoria answered 18/3, 2014 at 22:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.