I have a bunch of requests to process, and during the processing of those requests, more "sub-requests" can be generated and added to the same blocking collection. The consumers add sub-requests to the queue.
It's hard to know when to exit the consuming loop: clearly no thread can call BlockingCollection.CompleteAdding
as the other threads may add something to the collection. You also cannot exit the consuming loop just because the BlockingCollection
is empty as another thread may have just read the final remaining request from the BlockingCollection
and will be about to start generating more requests - the Count
of the BlockingCollection
will then increase from zero again.
My only idea on this so far is to use a Barrier
- when all threads reach the Barrier
, there can't be anything left in the BlockingCollection
and no thread can be generating new requests. Here is my code - is this an acceptable approach? (and please note: this is highly contrived block of code modelling a much more complex situation: no programmer really writes code that processes random strings π )
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;
namespace Barrier1
{
class Program
{
private static readonly Random random = new Random();
private static void Main()
{
var bc = new BlockingCollection<string>();
AddRandomStringsToBc(bc, 1000, true);
int nTasks = 4;
var barrier = new Barrier(nTasks);
Action a = () => DoSomething(bc, barrier);
var actions = Enumerable.Range(0, nTasks).Select(x => a).ToArray();
Parallel.Invoke(actions);
}
private static IEnumerable<char> GetC(bool includeA)
{
var startChar = includeA ? 'A' : 'B';
var add = includeA ? 24 : 25;
while (true)
{
yield return (char)(startChar + random.Next(add));
}
}
private static void DoSomething(BlockingCollection<string> bc, Barrier barrier)
{
while (true)
{
if (bc.TryTake(out var str))
{
Console.WriteLine(str);
if (str[0] == 'A')
{
Console.WriteLine("Adding more strings...");
AddRandomStringsToBc(bc, 100);
}
}
else
{
// Can't exit the loop here just because there is nothing in the collection.
// A different thread may be just about to call AddRandomStringsToBc:
if (barrier.SignalAndWait(100))
{
break;
}
}
}
}
private static void AddRandomStringsToBc(BlockingCollection<string> bc, int n, bool startWithA = false, bool sleep = false)
{
var collection = Enumerable.Range(0, n).Select(x => string.Join("", GetC(startWithA).Take(5)));
foreach (var c in collection)
{
bc.Add(c);
}
}
}
}
Barrier
you have to keep every thread alive until the end.CountdownEvent
would be a better fit for this scenario: learn.microsoft.com/en-us/dotnet/api/… β LeuciteCountdownEvent
? BecauseSignal
decrements the count, you would have to callCountdownEvent.AddCount
if the other threads have not signalled. The idea is that a thread must not exit until all threads are ready to exit. (i.e. Thread 1 cannot exit, because Thread 2 may queue a request) β Jevon