BlockingCollection where the consumers are also producers
Asked Answered
J

1

1

I have a bunch of requests to process, and during the processing of those requests, more "sub-requests" can be generated and added to the same blocking collection. The consumers add sub-requests to the queue.

It's hard to know when to exit the consuming loop: clearly no thread can call BlockingCollection.CompleteAdding as the other threads may add something to the collection. You also cannot exit the consuming loop just because the BlockingCollection is empty as another thread may have just read the final remaining request from the BlockingCollection and will be about to start generating more requests - the Count of the BlockingCollection will then increase from zero again.

My only idea on this so far is to use a Barrier - when all threads reach the Barrier, there can't be anything left in the BlockingCollection and no thread can be generating new requests. Here is my code - is this an acceptable approach? (and please note: this is highly contrived block of code modelling a much more complex situation: no programmer really writes code that processes random strings 😊 )

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;

namespace Barrier1
{
    class Program
    {
        private static readonly Random random = new Random();
        private static void Main()
        {
            var bc = new BlockingCollection<string>();
            AddRandomStringsToBc(bc, 1000, true);
            int nTasks = 4;
            var barrier = new Barrier(nTasks);
            Action a = () => DoSomething(bc, barrier);
            var actions = Enumerable.Range(0, nTasks).Select(x => a).ToArray();
            Parallel.Invoke(actions);
        }

        private static IEnumerable<char> GetC(bool includeA)
        {
            var startChar = includeA ? 'A' : 'B';
            var add = includeA ? 24 : 25;
            while (true)
            {
                yield return (char)(startChar + random.Next(add));
            }
        }

        private static void DoSomething(BlockingCollection<string> bc, Barrier barrier)
        {
            while (true)
            {
                if (bc.TryTake(out var str))
                {
                    Console.WriteLine(str);
                    if (str[0] == 'A')
                    {
                        Console.WriteLine("Adding more strings...");
                        AddRandomStringsToBc(bc, 100);
                    }
                }
                else
                {
                    // Can't exit the loop here just because there is nothing in the collection.
                    // A different thread may be just about to call AddRandomStringsToBc:
                    if (barrier.SignalAndWait(100))
                    {
                        break;
                    }
                }
            }
        }

        private static void AddRandomStringsToBc(BlockingCollection<string> bc, int n, bool startWithA = false, bool sleep = false)
        {
            var collection = Enumerable.Range(0, n).Select(x => string.Join("", GetC(startWithA).Take(5)));
            foreach (var c in collection)
            {
                bc.Add(c);
            }
        }
    }
}
Jevon answered 1/4, 2022 at 15:0 Comment(4)
Do you know exactly how many are the consumers/producers from the beginning, or it's possible that more consumers/producers can be added later at some unknown moment? – Iolenta
@TheodorZoulias, no. Neither consumers nor producers will be added later. – Jevon
With Barrier you have to keep every thread alive until the end. CountdownEvent would be a better fit for this scenario: learn.microsoft.com/en-us/dotnet/api/… – Leucite
@KevinGosse, the point is that you want to keep every thread alive until the end. All threads should participate in the processing. If a thread dies (e.g. due to an exception), you would simply decrement the participant count (I left that out in my original code). Could you explain how you would use CountdownEvent? Because Signal decrements the count, you would have to call CountdownEvent.AddCount if the other threads have not signalled. The idea is that a thread must not exit until all threads are ready to exit. (i.e. Thread 1 cannot exit, because Thread 2 may queue a request) – Jevon
I
3

Here is a collection similar to the BlockingCollection<T>, with the difference that it completes automatically instead of relying on manually calling the CompleteAdding method. The condition for the automatic completion is that the collection is empty, and all the consumers are in a waiting state.

The implementation is based on your clever idea of using a Barrier as a mechanism for checking the auto-complete condition. It's not perfect because it relies on pooling, which is taking place when the collection becomes empty and has some consumers that are still active. On the other hand it allows to exploit all the existing functionality of the BlockingCollection<T> class, instead of rewriting it from scratch:

/// <summary>
/// A blocking collection that completes automatically when it's empty, and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T> : IEnumerable<T>, IDisposable
{
    private readonly BlockingCollection<T> _queue;
    private readonly Barrier _barrier;
    private volatile bool _autoCompleteStarted;
    private volatile int _intervalMilliseconds = 500;

    public AutoCompleteBlockingCollection(int boundedCapacity = -1)
    {
        _queue = boundedCapacity == -1 ? new() : new(boundedCapacity);
        _barrier = new(0, _ => _queue.CompleteAdding());
    }

    public int Count => _queue.Count;
    public int BoundedCapacity => _queue.BoundedCapacity;
    public bool IsAddingCompleted => _queue.IsAddingCompleted;
    public bool IsCompleted => _queue.IsCompleted;

    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete() => _autoCompleteStarted = true;

    /// <summary>
    /// Gets or sets how frequently to check for the auto-complete condition.
    /// </summary>
    public TimeSpan CheckAutoCompleteInterval
    {
        get { return TimeSpan.FromMilliseconds(_intervalMilliseconds); }
        set
        {
            int milliseconds = checked((int)value.TotalMilliseconds);
            if (milliseconds < 0) throw new ArgumentOutOfRangeException();
            _intervalMilliseconds = milliseconds;
        }
    }

    public void Add(T item, CancellationToken cancellationToken = default)
        => _queue.Add(item, cancellationToken);

    public bool TryAdd(T item) => _queue.TryAdd(item);

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        _barrier.AddParticipant();
        try
        {
            while (true)
            {
                if (!_autoCompleteStarted)
                {
                    if (_queue.TryTake(out var item, _intervalMilliseconds,
                        cancellationToken))
                        yield return item;
                }
                else
                {
                    if (_queue.TryTake(out var item, 0, cancellationToken))
                        yield return item;
                    else if (_barrier.SignalAndWait(_intervalMilliseconds,
                        cancellationToken))
                        break;
                }
            }
        }
        finally { _barrier.RemoveParticipant(); }
    }

    IEnumerator<T> IEnumerable<T>.GetEnumerator()
        => ((IEnumerable<T>)_queue).GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator()
        => ((IEnumerable<T>)_queue).GetEnumerator();

    public void Dispose() { _barrier.Dispose(); _queue.Dispose(); }
}

The BeginObservingAutoComplete method should be called after adding the initial items in the collection. Before calling this method, the auto-complete condition is not checked.

The CheckAutoCompleteInterval is 500 milliseconds by default, and it can be configured at any time.

The Take and TryTake methods are missing on purpose. The collection is intended to be consumed via the GetConsumingEnumerable method. This way the collection keeps track of the currently subscribed consumers, in order to know when to auto-complete. Consumers can be added and removed at any time. A consumer can be removed by exiting the foreach loop, either by break/return etc, or by an exception.

Usage example:

private static void Main()
{
    var bc = new AutoCompleteBlockingCollection<string>();
    AddRandomStringsToBc(bc, 1000, true);
    bc.BeginObservingAutoComplete();
    Action action = () => DoSomething(bc);
    var actions = Enumerable.Repeat(action, 4).ToArray();
    Parallel.Invoke(actions);
}

private static void DoSomething(AutoCompleteBlockingCollection<string> bc)
{
    foreach (var str in bc.GetConsumingEnumerable())
    {
        Console.WriteLine(str);
        if (str[0] == 'A')
        {
            Console.WriteLine("Adding more strings...");
            AddRandomStringsToBc(bc, 100);
        }
    }
}

The collection is thread-safe, with the exception of the Dispose method.

Iolenta answered 2/4, 2022 at 6:53 Comment(0)

© 2022 - 2024 β€” McMap. All rights reserved.