How to properly run multiple async tasks in parallel? [duplicate]
Asked Answered
I

3

10

What if you need to run multiple asynchronous I/O tasks in parallel but need to make sure that no more than X I/O processes are running at the same time; and pre and post I/O processing tasks shouldn't have such limitation.

Here is a scenario - let's say there are 1000 tasks; each of them accepts a text string as an input parameter; transforms that text (pre I/O processing) then writes that transformed text into a file. The goal is to make pre-processing logic utilize 100% of CPU/Cores and I/O portion of the tasks run with max 10 degree of parallelism (max 10 simultaneously opened for writing files at a time).

Can you provide a sample code how to do it with C# / .NET 4.5?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx

Imaimage answered 29/5, 2012 at 14:37 Comment(2)
Rx 2.0 might be a good fit for this (throttling the second stage to 10 at a time) but I'm not familiar enough with it to say for sure. :-/Dingbat
Does this answer your question? Nesting await in Parallel.ForEachParodic
B
9

I think using TPL Dataflow for this would be a good idea: you create pre- and post-process blocks with unbounded parallelism, a file-writing block with limited parallelism and link them together. Something like:

var unboundedParallelismOptions =
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    };

var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions);

var writeToFileBlock = new TransformBlock<string, string>(
    async s =>
            {
                await WriteToFile(s);
                return s;
            },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions);

var propagateCompletionOptions =
    new DataflowLinkOptions { PropagateCompletion = true };

preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);

// use something like await preProcessBlock.SendAsync("text") here

preProcessBlock.Complete();
await postProcessBlock.Completion;

Where WriteToFile() could look like this:

private static async Task WriteToFile(string s)
{
    using (var writer = new StreamWriter(GetFileName()))
        await writer.WriteAsync(s);
}
Bacchius answered 29/5, 2012 at 16:13 Comment(2)
What are the PreProcess and PostProcess methods here?Promisee
@Promisee They do whatever is needed. The original question talks about "pre and post I/O processing tasks", so I represented that using methods.Bacchius
T
1

It sounds like you'd want to consider a Djikstra Semaphore to control access to the starting of tasks.

However, this sounds like a typical queue/fixed number of consumers kind of problem, which may be a more appropriate way to structure it.

Tysontyumen answered 29/5, 2012 at 14:40 Comment(0)
M
0

I would create an extension method in which one can set maximum degree of parallelism. SemaphoreSlim will be the savior here.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Martinic answered 9/5, 2018 at 23:10 Comment(3)
Hi, I used it in a thread. I've tried to stop the thread by Abort function but ForEachAsyncConcurrent task still running. Do you have a solution for this issue?Wetmore
@TienNguyen I would say add the cancelationToken as the param of ForEachAsyncConcurrent method and cancel that when you stop the thread.Martinic
Can you update your sample code with cancelationToken? Thank you so much!Wetmore

© 2022 - 2024 — McMap. All rights reserved.