How do I use a BlockingCollection in the Producer/Consumer pattern when the producers are also the consumers - How do I end?
Asked Answered
H

1

9

I have a recursive problem where the consumers do some work at each level of a tree, then need to recurse down the tree and perform that same work at the next level.

I want to use ConcurrentBag/BlockingCollection etc to run this in parallel. In this scenario, the consumers of the queue, are also the producers for the queue!

My problem is this : Using BlockingCollection, I can write very simple foreach logic to dequeue items, and queue new ones - When the queue is empty, the blocking collection will block correctly, and wait for new work to be produced by one of the other consumers.

But how do I know if all of the consumers are blocking?!

I know about CompleteAdding(), but that does not seem to serve, as the only time you are actually complete is when all of the producers are done producing and the queue is empty - and since they would all be blocking, there is nobody "free" to set CompleteAdding(). Is there a way to detect this? (Perhaps an event that can fire when blocking, and fire again when unblocked?)

I can deal with this manually, by not using a foreach, but manually having a while(!complete) loop, and using TryTake, but then I need to manually sleep, which seems inefficent (the whole reason to have the blocking collection vs just the concurrent collections in the first place!) Each time through the loop, if TryTake is false, I could set an Idle flag, and then have a Master check if the queue is empty, and all threads are idle, set a complete flag, but again, this seems kludgy.

Intuition is telling me there is some way to use BlockingCollection to do this, but I cant quite get there.

Anyway, anyone have a good pattern for when the consumers are the producers and being able to detect when to release all the blocks would be awesome.

Historiated answered 1/6, 2012 at 14:22 Comment(5)
Good question. Anything with external flags or events seems ripe for race conditions.Potts
Do the processors (combined consumers/producers) have a lot of state or require a lot of resource? Could you re-cast the problem in terms of creating Tasks that each do one iteration only?Potts
@Potts : Yes, I can do single iteration, and in fact already have that working, but am trying to use a producer/consumer pattern as this is code that may be migrated to the cloud in the future, where worker roles will be using the Azure Queue storage in the same way, and I would like to keep the overall logic as similar as possible between the two implementations. In that scenario I will be forced to check if the Workers are all idle to decide that the queue will is done, but it seems like I should be efficient where possible locally - also, I just want to figure this out :)Historiated
Here is an article that almost answers the question, but not quite, since it assumes all production will be done before consumption begins. Im thikning I may have to manually sleep and check :( informit.com/guides/content.aspx?g=dotnet&seqNum=842Historiated
A somewhat related question: How to mark a TPL dataflow cycle to complete?Parish
P
1

Here is a low-level implementation of a collection similar to the BlockingCollection<T>, with the difference that it completes automatically instead of relying on manually calling the CompleteAdding method. The condition for the automatic completion is that the collection is empty, and all the consumers are in a waiting state.

/// <summary>
/// A blocking collection that completes automatically when it's empty and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private int _consumersCount = 0;
    private int _waitingConsumers = 0;
    private bool _autoCompleteStarted;
    private bool _completed;

    public int Count { get { lock (_queue) return _queue.Count; } }
    public bool IsCompleted => Volatile.Read(ref _completed);

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_completed) throw new InvalidOperationException(
                "The collection has completed.");
            _queue.Enqueue(item);
            Monitor.Pulse(_queue);
        }
    }

    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete()
    {
        lock (_queue)
        {
            if (_autoCompleteStarted) return;
            _autoCompleteStarted = true;
            Monitor.PulseAll(_queue);
        }
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        bool waiting = false;
        lock (_queue) _consumersCount++;
        try
        {
            while (true)
            {
                T item;
                lock (_queue)
                {
                    if (_completed) yield break;
                    while (_queue.Count == 0)
                    {
                        if (_autoCompleteStarted &&
                            _waitingConsumers == _consumersCount - 1)
                        {
                            _completed = true;
                            Monitor.PulseAll(_queue);
                            yield break;
                        }
                        waiting = true; _waitingConsumers++;
                        Monitor.Wait(_queue);
                        waiting = false; _waitingConsumers--;
                        if (_completed) yield break;
                    }
                    item = _queue.Dequeue();
                }
                yield return item;
            }
        }
        finally
        {
            lock (_queue)
            {
                _consumersCount--;
                if (waiting) _waitingConsumers--;
                if (!_completed && _autoCompleteStarted &&
                    _waitingConsumers == _consumersCount)
                {
                    _completed = true;
                    Monitor.PulseAll(_queue);
                }
            }
        }
    }
}

The AutoCompleteBlockingCollection<T> offers only the most basic functionality of the BlockingCollection<T> class. Features like bounded capacity and cancellation are not supported.

Usage example:

var queue = new AutoCompleteBlockingCollection<Node>();
queue.Add(rootNode);
queue.BeginObservingAutoComplete();
Task[] workers = Enumerable.Range(1, 4).Select(_ => Task.Run(() =>
{
    foreach (Node node in queue.GetConsumingEnumerable())
    {
        Process(node);
        foreach (Node child in node.Children)
            queue.Add(child);
    }
})).ToArray();
await Task.WhenAll(workers);

The BeginObservingAutoComplete method should be called after adding the initial items in the collection. Before calling this method, the auto-complete condition is not checked. In the above example only one item is added before starting to observe the auto-complete condition. Then four workers are launched, with each worker consuming the collection, processing each consumed node, and then adding the children of this node in the collection. Eventually all nodes of the tree will be consumed, and the last active worker will trigger the automatic completion of the collection. This will allow all workers to exit the consuming loop and complete.

Adding and removing consumers at any time (dynamically) is supported. The collection is thread-safe.

A feature-rich but less efficient implementation of the above collection can be found here.

Parish answered 3/4, 2022 at 20:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.