Stop Parallel.ForEachAsync
Asked Answered
I

2

17

In C#, I am interested in stopping a Parallel.ForEachAsync loop (considering the differences between Stop and Break); for Parallel.ForEach I can do the following:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});

However, since I have a process that needs to be executed asynchronously, I switched to Parallel.ForEachAsync. The ForEachAsync does not have the Stop() method, I'm able to break the loop as the following, but I'm wondering if this is the most effective way of breaking the loop (in other words, the loop needs to stop ASAP when it receives the cancellation request).

await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});
Inchmeal answered 24/1, 2022 at 0:14 Comment(1)
This question is relevant as well. It uses a flag instead of cancellationTokenScrewworm
R
20

The Parallel.ForEachAsync body delegate has a CancellationToken as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested at strategic places inside the lambda:

CancellationTokenSource cts = new();
ParallelOptions options = new() { CancellationToken = cts.Token };

try
{
    await Parallel.ForEachAsync(items, options, async (item, ct) =>
    {
        //...
        ct.ThrowIfCancellationRequested();
        //...
        await ProcessAsync(item, ct);
        //...
        ct.ThrowIfCancellationRequested();
        //...
    });
}
catch (OperationCanceledException)
{
    // ... (the cts was canceled)
}

The token provided as argument in the lambda, the ct in the above example, is canceled not only when the ParallelOptions.CancellationToken is canceled, but also in case a ProcessAsync operation has failed. This mechanism allows faster propagation of exceptions. The parallel loop does not complete immediately when an error occurs, because it follows the principle of disallowing fire-and-forget operations. All operations that are started internally by the loop, must be completed before the whole loop completes either successfully or with failure. The token in the lambda makes it possible to reduce this latency to a minimum.

Retene answered 24/1, 2022 at 0:49 Comment(2)
What's the difference between ParallelOptions.CancellationToken and the one you get in your callback (ct in your example)? If the one in options gets cancelled, will the one in your callback be cancelled as well?Screwworm
Here is a relevant question about this: The need for two cancellation tokens in .NET 6 Parallel.ForEachAsync?Retene
C
-3

You're going to need something like this:

await Parallel.ForEachAsync(items, async (item, state) =>
{
    await ProcessAsync(item, cancellationToken);
});


async Task ProcessAsync(string item, CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        //Process
    }
}
Coagulate answered 24/1, 2022 at 0:22 Comment(1)
The drawback of this approach is how fast it breaks the loop. For instance, if there are 1e9 items and the cancellation is requested when it has just started the first item, this approach will iterate through all the remaining 1e9 - 1 items before it returns.Inchmeal

© 2022 - 2025 — McMap. All rights reserved.