Stopping Parallel.ForEach with Cancellation Token and Stop
Asked Answered
K

1

1

I'm not sure if I'm stopping a Parallel.ForEach loop as I intend to do. So let me outline the problem.

The loop uses a database driver with limited available connections and it is required to keep track of the open connections, so the database driver doesn't throw an exception. The issue is that keeping track of open connections has been implemented manually (this should be refactored - writing a wrapper or using AutoResetEvent but there are some other things that need to be taken care of first). So I need to keep track of the open connections and especially I have to handle the case of an exception:

Parallel.ForEach(hugeLists, parallelOptions, currentList => {
  WaitForDatabaseConnection();
  try {
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     throw;
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

This is the simplified version of the loop without cancellation. To improve the performance in case of an Exception the loop should be cancelled as soon as possible when an Exception is thrown. If one thing fails the loop should stop.

How can I achieve that making sure that numOfOpenConnections is being updated correctly?

What I have tried so far (is this behaving like I want it to or am I missing something?):

Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
  parallelOptions.CancellationToken.ThrowIfCancellationRequested();
  WaitForDatabaseConnection();
  try {     
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     cancellationTokenSource.Cancel();  
     parallelLoopState.Stop();
     throw; // still want to preserve the original exception information
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

I could wrap this code in a try - catch construct and catch AggregateException.

Kookaburra answered 9/6, 2020 at 10:7 Comment(13)
Why are you calling parallelLoopState.Stop and use a CancellationTokenSource? Does it make the termination of the parallel loop any faster than just letting the method terminate by itself? AFAIK an unhandled exception in the lambda will make the parallel loop to terminate as soon as all currently running lambdas have completed.Piezoelectricity
@TheodorZoulias you're right the WaitForDatabaseConnection is placed outside the for loop - I oversaw this when simplifying it for stackoverflow.Kookaburra
@TheodorZoulias I thought with the parallelLoopState.Stop it will still be possible that new Tasks enter - but actually this can't happen - therefore I thought I need a Cancellation token... Do I always get an exception of type AggregateException if an exception is thrown in the loop body?Kookaburra
Yeap, according to the documentation any exception inside the lambda will result to a final AggregateException. There is a possibility for other exceptions to be thrown (OperationCanceledException, ArgumentNullException and ObjectDisposedException), but these are unrelated to what happens inside the lambda.Piezoelectricity
Ty, the other exceptions should not occur because I don't need cancellation tokensKookaburra
You could consider passing the parallelLoopState as a parameter of the DoDatabaseCallAndInsertions method, and inspect there frequently the property ShouldExitCurrentIteration, and exit fast in case it has become true. This should increase the responsiveness of the parallel loop in case of an exception.Piezoelectricity
@TheodorZoulias I'll consider that, do you think it is feasible to do this with a timer? E.g. if there is an expensive database query I need to stop the query and therefore I would need some asynchronous timer to check for DoDatabaseCallAndInsertionsKookaburra
Is this expensive database query cancelable? If not, are you OK with exiting the parallel loop early (in case of an exception) while some queries are still running?Piezoelectricity
Yes the database query inserts into temporary tables and uses transactions - so it is safe to abort the query at any given time - the database system just aborts the transaction and uses rollbacksKookaburra
Aborting the transaction and rolling back may take some time. Do you want to wait for the abort-rollback to complete, or just exit the parallel loop as fast as possible, and leave the database do its job unattended?Piezoelectricity
The second one - just exit as fast as possible and let the database do the clean upKookaburra
OK. On more question: how are you communicating with the database the request for a transaction rollback? Is it enough for example to pass a CancellationToken to the data access layer Execute method?Piezoelectricity
Sadly there is no real DataAccessLayer - it are direct calls like executeGeneric on oracle database without abstraction (ODP oracle.com/webfolder/technetwork/tutorials/obe/db/dotnet/…)Kookaburra
P
1

You could call the DoDatabaseCallAndInsertions method in a way that waits for its completion only while the state of the loop is not exceptional, and otherwise forgets about it and allows the parallel loop to complete immediately. Using a cancelable wrapper is probably the simplest way to achieve this. Here is a method RunAsCancelable that waits for a function to complete, or a CancellationToken to become canceled, whatever comes first:

public static TResult RunAsCancelable<TResult>(Func<TResult> function,
    CancellationToken token)
{
    token.ThrowIfCancellationRequested();
    Task<TResult> task = Task.Run(function, token);
    try
    {
        // Wait for the function to complete, or the token to become canceled
        task.Wait(token);
    }
    catch { } // Prevent an AggregateException to be thrown

    token.ThrowIfCancellationRequested();
    // Propagate the result, or the original exception unwrapped
    return task.GetAwaiter().GetResult();
}

public static void RunAsCancelable(Action action, CancellationToken token)
    => RunAsCancelable<object>(() => { action(); return null; }, token);

The RunAsCancelable method throws an OperationCanceledException in case the token was canceled before the completion of the action, or propagates the exception occurred in the action, or completes successfully if the action completed successfully.

Usage example:

using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
    Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
    {
        WaitForDatabaseConnection();
        try
        {
            Interlocked.Increment(ref numOfOpenConnections);
            RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
                failureCTS.Token);
        }
        catch (OperationCanceledException ex)
            when (ex.CancellationToken == failureCTS.Token)
        {
            // Do nothing (an exception occurred in another thread)
        }
        catch (Exception ex)
        {
            Log.Error(ex);
            failureCTS.Cancel(); // Signal failure to the other threads
            throw; // Inform the parallel loop that an error has occurred
        }
        finally
        {
            Interlocked.Decrement(ref numOfOpenConnections);
        }
    });
}

The DoDatabaseCallAndInsertions method can inspect the property IsCancellationRequested of the CancellationToken parameter at various points, and perform a transaction rollback if needed.

It should be noted that the RunAsCancelable method is quite wasteful regarding the usage of ThreadPool threads. One extra thread must be blocked in order to make each supplied action cancelable, so two threads are needed for each execution of the lambda. To prevent a possible starvation of the ThreadPool, it is probably a good idea to increase the minimum number of threads that the thread pool creates on demand before switching to the create-one-every-500-msec algorithm, by using the ThreadPool.SetMinThreads method at the startup of the application.

ThreadPool.SetMinThreads(100, 10);

Important: The above solution makes no attempt to log the possible exceptions of the operations that have been forgotten. Only the exception of the first failed operation will be logged.

Piezoelectricity answered 9/6, 2020 at 16:27 Comment(3)
Ty - I will think about it. For the time being I'll just use Stop because I don't want to complicate the performance to much and I'll have to decide whether this additional performance gain will be worth the cost to complicate the code base - maybe after some more refactoring workKookaburra
@MatthiasHerrmann yeap, it's a bit complicated indeed. In case you make the effort eventually to refactor the code, I would suggest taking a look at the TPL Dataflow library. It is a much more powerful alternative to the Parallel class. Unlike the Parallel.ForEach it supports asynchronous lambdas, so the cancelable wrapper could be made asynchronous (without wasting threads).Piezoelectricity
@MatthiasHerrmann the TPL Dataflow has some learning curve though. It may need 1-2 days of study before being able to be productive with it. Its selling point is its ability to perform task-parallelism on top of data-parallelism. In other words you could configure different parts of the work to be processed independently from the others parts, with a different degree of parallelism for each part. This is done by forming a "pipeline" consisting of linked processing "blocks", with the data flowing from one block to the next.Piezoelectricity

© 2022 - 2024 — McMap. All rights reserved.