Wait until a BlockingCollection queue is cleared by a background thread, with a timeout if it takes too long?
Asked Answered
D

3

11

In C#, I'm wondering if it's possible to wait until a BlockingCollection is cleared by a background thread, with a timeout if it takes too long.

The temporary code that I have at the moment strikes me as somewhat inelegant (since when is it good practice to use Thread.Sleep?):

while (_blockingCollection.Count > 0 || !_blockingCollection.IsAddingCompleted)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(20));
    // [extra code to break if it takes too long]
}
Deportment answered 7/1, 2014 at 13:22 Comment(5)
I assume that you are only doing this AFTER calling BlockingCollection.CompleteAdding() - correct? Anyway, there's not really any better way than what you have.Deus
@Matthew Watson. Good point; I've added an extra term to make sure that there is no queued items that havn't been added to the queue. I've also clarified the question to mention that a background thread is clearing the queue out.Deportment
What do you mean exactly by "wait"? If you are using TryTake you can indeed specify a timeout. You are more limited if you are using the enumerator, but in principle you could make an enumerator wrapper that uses TryTake.Jeuz
@Deportment I think your loop condition should be _blockingCollection.Count != 0 || !blockingCollection.IsAddingCompleted (note the use of || instead of &&)Deus
@Jeuz I wish that I could fix it using this method, but currently the code is waiting for the background thread to clear out the queue as part of the unit testing process, so it can't actually use TryTake itself.Deportment
J
6

What if you write something like this in your consuming thread:

var timeout = TimeSpan.FromMilliseconds(10000);
T item;
while (_blockingCollection.TryTake(out item, timeout))
{
     // do something with item
}
// If we end here. Either we have a timeout or we are out of items.
if (!_blockingCollection.IsAddingCompleted)
   throw MyTimeoutException();
Jeuz answered 7/1, 2014 at 13:51 Comment(0)
D
7

You can use GetConsumingEnumerable() and foreach in the consuming thread to determine when the queue is empty, and then set a ManualResetEvent which the main thread can check to see if the queue is empty. GetConsumingEnumerable() returns an enumerator which checks whether CompleteAdding() has been called before it terminates on an empty queue.

Sample code:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private void run()
        {
            Task.Run(new Action(producer));
            Task.Run(new Action(consumer));

            while (!_empty.WaitOne(1000))
                Console.WriteLine("Waiting for queue to empty");

            Console.WriteLine("Queue emptied.");
        }

        private void producer()
        {
            for (int i = 0; i < 20; ++i)
            {
                _queue.Add(i);
                Console.WriteLine("Produced " + i);
                Thread.Sleep(100);
            }

            _queue.CompleteAdding();
        }

        private void consumer()
        {
            foreach (int n in _queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumed " + n);
                Thread.Sleep(200);
            }

            _empty.Set();
        }

        private static void Main()
        {
            new Program().run();
        }

        private BlockingCollection<int> _queue = new BlockingCollection<int>();

        private ManualResetEvent _empty = new ManualResetEvent(false);
    }
}
Deus answered 7/1, 2014 at 14:4 Comment(0)
J
6

What if you write something like this in your consuming thread:

var timeout = TimeSpan.FromMilliseconds(10000);
T item;
while (_blockingCollection.TryTake(out item, timeout))
{
     // do something with item
}
// If we end here. Either we have a timeout or we are out of items.
if (!_blockingCollection.IsAddingCompleted)
   throw MyTimeoutException();
Jeuz answered 7/1, 2014 at 13:51 Comment(0)
P
3

If you can redesign to allow an event to be set when the collection is empty, you could use a wait handle to wait for the collection to be empty:

static void Main(string[] args)
{
    BlockingCollection<int> bc = new BlockingCollection<int>();
    ManualResetEvent emptyEvent = new ManualResetEvent(false);
    Thread newThread = new Thread(() => BackgroundProcessing(bc, emptyEvent));
    newThread.Start();
    //wait for the collection to be empty or the timeout to be reached.
    emptyEvent.WaitOne(1000);

}
static void BackgroundProcessing(BlockingCollection<int> collection, ManualResetEvent emptyEvent)
{
    while (collection.Count > 0 || !collection.IsAddingCompleted)
    {
        int value = collection.Take();
        Thread.Sleep(100);
    }
    emptyEvent.Set();
}
Paske answered 7/1, 2014 at 13:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.