Parallel.ForEach loop with BlockingCollection.GetConsumableEnumerable
Asked Answered
P

2

24

Why Parallel.ForEach loop exits with OperationCancelledException, while using GetConsumableEnumerable?

//outside the function
static BlockingCollection<double> _collection = new BlockingCollection<double>();
    
    
var t = Task.Factory.StartNew(Producer);            
Parallel.ForEach(_collection.GetConsumingEnumerable(),
    item => Console.WriteLine("Processed {0}", item));
Console.WriteLine("FINISHED processing");


public static void Producer()
{
     var data = Enumerable.Range(1, 1000);
     foreach (var i in data)
     {
        _collection.Add(i);
        Console.WriteLine("Added {0}",i);
     }
                    
     Console.WriteLine("Finished adding");
     _collection.CompleteAdding();
}
Pamalapamela answered 7/7, 2011 at 8:50 Comment(1)
I can't reproduce the OperationCancelledException behavior on .NET 7. The code in the question runs successfully to completion, no exception is thrown.Locarno
R
26

Using Parallel.ForEach with BlockingCollection is somewhat problematic, as I found out recently. It can be made to work, but it needs a little extra effort.

Stephen Toub has an excellent blog post on it, and if you download the "Parallel Extension Extras" project (also available on NuGet) you'll find some code ready to help you.

Recipience answered 7/7, 2011 at 8:54 Comment(5)
What really puzzles me is why Parallel.ForEach throws exception when I call _collection.CompleteAdding().Pamalapamela
@Sam: I wouldn't like to say, to be honest. There's too much deep magic going on there for me to have any confidence in saying the right thing :)Recipience
The current URL to the Parallel Extensions Extras: code.msdn.microsoft.com/ParExtSamples and someone has made a NuGet of the extensions: nuget.org/packages/MSFT.ParallelExtensionsExtrasTrimmer
A more recent article on this combination by Can Bilgin - linkYoungs
link to blog: ParallelExtensionsExtras Tour – #4 – BlockingCollectionExtensionsEncyclopedia
L
3

Using the Parallel.ForEach with a BlockingCollection<T> as source, requires two specific adjustments:

  1. Use the EnumerablePartitionerOptions.NoBuffering option.
  2. Specify the MaxDegreeOfParallelism to a value other than -1.

Example of correct usage:

Partitioner<Item> partitioner = Partitioner.Create(collection.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

ParallelOptions options = new()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(partitioner, options, item =>
{
    //...
});

Explanation:

  1. The EnumerablePartitionerOptions.NoBuffering is required because otherwise the Parallel.ForEach will not process immediately each consumed item. Instead it will put the item in a small buffer, and will wait until the buffer reaches an arbitrary size before processing all the items in the buffer. This behavior introduces undesirable latency, and can even cause deadlocks in some advanced scenarios.
  2. Configuring the MaxDegreeOfParallelism is required in order to keep the ThreadPool usage under control. Otherwise the Parallel.ForEach will keep asking for more and more threads, prompting the ThreadPool to create new threads at a rate of one new thread per second, even while the BlockingCollection<T> is completely empty and the parallel loop is idle! For an experimental demonstration of this strange behavior, see this question.
Locarno answered 20/9, 2023 at 5:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.