Why does iterating over GetConsumingEnumerable() not fully empty the underlying blocking collection
Asked Answered
B

4

19

I have a quantifiable & repeatable problem using the Task Parallel Library, BlockingCollection<T>, ConcurrentQueue<T> & GetConsumingEnumerable while trying to create a simple pipeline.

In a nutshell, adding entries to a default BlockingCollection<T> (which under the hood is relying on a ConcurrentQueue<T>) from one thread, does not guarantee that they will be popped off the BlockingCollection<T> from another thread calling the GetConsumingEnumerable() Method.

I've created a very simple Winforms Application to reproduce/simulate this which just prints integers to the screen.

  • Timer1 is responsible for queueing up the work items... It uses a concurrent dictionary called _tracker so that it knows what it has already added to the blocking collection.
  • Timer2 is just logging the count state of both the BlockingCollection & of the _tracker
  • The START button kicks off a Paralell.ForEach which simply iterates over the blocking collections GetConsumingEnumerable() and starts printing them to the second list box.
  • The STOP button stops Timer1 preventing more entries from being added to the blocking collection.
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

Here's the sequence of events:

  • Press Start
  • Timer1 ticks & ListBox1 is immediately updated with 3 messages (Adding 0, 1, 2)
  • ListBox2 is subsequent updated with 3 messages, 1 second apart
    • Processing 0
    • Processing 1
    • Processing 2
  • Timer1 ticks & ListBox1 is immediately updated with 3 messages (Adding 3, 4, 5)
  • ListBox2 is sbsequent updated with 2 messages, 1 second apart
    • Processing 3
    • Processing 4
    • Processing 5 is not printed... would appear to have gone "missing"
  • Press STOP to prevent more messages being added by timer 1
  • Wait... "Processing 5" still does not appear

Missing Entry

You can see that the concurrent dictionary is still tracking that 1 item has not yet been processed & subsequently removed from _tracker

If I Press Start again, then timer1 begins adding more 3 more entries and the Parallel loop comes back to life printing 5, 6, 7 & 8.

Entry returned after subsequent items shoved in behind it

I'm at a complete loss as to why this occurs. Calling start again obviously calls a newtask, which calls a Paralell foreach, and re-executes GetConsumingEnumerable() which magically finds the missing entry... I

Why is the BlockingCollection.GetConsumingEnumerable() not guaranteeing to iterate over every item that's added to the collection.

Why does the addition of more entries subsequently cause it to get "unstuck" and continue with it's processing?

Borgeson answered 18/4, 2012 at 11:23 Comment(1)
Thanks guys. You both pointed me in the right direction which led me toward this. @Svick this is probably why it's not an issue in the .net4.5 beta connect.microsoft.com/VisualStudio/feedback/details/674705/… and steven toup for the MS Parallel team has actually blogged on the issue. blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspxBorgeson
A
20

You can't use GetConsumingEnumerable() in Parallel.ForEach().

Use the GetConsumingPartitioner from the TPL extras

In the blog post you will also get an explanation why can't use GetConsumingEnumerable()

The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

i.e. Parallel.ForEach wait until it receives a group of work items before continuing. Exactly what your experiment shows.

Ambry answered 18/4, 2012 at 12:19 Comment(2)
The TPL extras are under the MS-LPL license which means if you use them, you're locking your entire derivative work to Windows. It is not an OSI-approved license...Respecting
@Daniel, Good to know. Thanks for the update. Do you happen to know if non-windows TPL also uses a grouping partitioner as defaultAmbry
D
9

As of .NET Framework 4.5, you can create a partitioner which will take only 1 item at a time:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(partitioner, parallelOptions, (batch, state) =>
{
    /* Do stuff */
});

EnumerablePartitionerOptions Enum

NoBuffering: Create a partitioner that takes items from the source enumerable one at a time and does not use intermediate storage that can be accessed more efficiently by multiple threads. This option provides support for low latency (items will be processed as soon as they are available from the source) and provides partial support for dependencies between items (a thread cannot deadlock waiting for an item that the thread itself is responsible for processing).

Dalhousie answered 24/11, 2015 at 18:50 Comment(2)
I confirm that as of .NET 4.5, this is the preferred option.Mervin
The accepted answer is informative. This one seems more up to date.Inspect
C
2

I couldn't replicate your behavior with simple console application doing basically the same thing (running on .Net 4.5 beta, which could make a difference). But I think the reason this happens is that Parallel.ForEach() tries to optimize execution by splitting the input collection into chunks. And with your enumerable, a chunk can't be created until you add more items to the collection. For more information, see Custom Partitioners for PLINQ and TPL on MSDN.

To fix this, don't use Parallel.ForEach(). If you still want to process the items in parallel, you can start a Task in each iteration.

Cherisecherish answered 18/4, 2012 at 12:10 Comment(1)
Thanks for checking. One of the "nice" features that the parallel.foreach provided was the ability to throttle the MaxDegreesOfParallelism for me (the real world version is calling a WCF Service). If I just spin through a normal foreach newing up Tasks in each iteration, how would you suggest I throttle the max number of concurrrent tasks.Borgeson
F
0

I feel like I should note just for clarity that in instances where you are able to call the BlockingCollection's .CompleteAdding() method prior to executing the Parallel.foreach, the issue you describe above will not be a problem. I have used these two objects together many times with great results.

In addition, you can always re-set your BlockingCollection after calling CompleteAdding() to add more items when needed (_entries = new BlockingCollection();)

Changing the click event code above as follows would solve your problem with the missing entry and make it work as expected, if you click the start and stop buttons multiple times:

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}
Fume answered 4/11, 2012 at 6:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.