Delayed Producer Consumer pattern
Asked Answered
H

4

5

I have a producer that produces integers by burst (1 to 50 in a few seconds). I have a consumer that consumes those integers by block.

I want the consumer to start consuming when the producer has finished his burst (I don't have the lead on the producer, I would just know that it has finished producing when there is nothing produced for 5 seconds).

I thought about thoses 2 differents way :

First : using kind of one consumer notfying the other :

private readonly List<int> _ids = new List<int>();
private readonly ManualResetEvent _mainWaiter = new ManualResetEvent(false);
private readonly ManualResetEvent _secondaryWaiter = new ManualResetEvent(false);

//This methods consumes the id from the producer
public void OnConsumeId(int newId)
{
    lock(_ids)
    {
        _ids.Add(newId);
        _mainWaiter.Set();
        _secondaryWaiter.Set();
    }
}

//This methods runs on the dedicated thread :
public void ConsumerIdByBlock()
{
    while(true)
    {
        _mainWaiter.Wait();
        while(_secondaryWaiter.Wait(5000));

        List<int> localIds;
        lock(_ids)
        {
            localIds = new List<int>(_ids);
            _ids.Clear();
        }
        //Do the job with localIds
    }
}

Second : have a kind of token for the last update

//This methods consumes the id from the producer
private int _lastToken;
public void OnConsumeId(int newId)
{
    lock(_ids)
    {
        _ids.Add(newId);
        ThreadPool.Queue(()=>ConsumerIdByBlock(++_lastToken));
    }
}

//This methods runs on the dedicated thread :
public void ConsumerIdByBlock(int myToken)
{       
    Thread.Sleep(5000);

    List<int> localIds;
    lock(_ids)
    {
        if(myToken !=_lastToken)
            return;     

        localIds = new List<int>(_ids);
        _ids.Clear();
    }

    //Do the job with localIds  
}

But I find these approaches a bit too complicated for doing this. Does a native/simpler solution exists ? How would you do ?

Hospers answered 1/5, 2014 at 14:51 Comment(5)
Why not just store the integers as sets in the list, to represent each burst? E.g. List<List<int>> or List<HashSet<int>> if they are unique.Ulcerative
@Chris Laplante, does not solve the issue to know when to start to consume themHospers
@ConradFrix, in fact it it should be lock(_ids) {localIds = new List<int>(_ids); _ids.Clear();} . I fixed it in the post.Hospers
@Toto; The point is, the producer only places the set on the queue when it is complete and ready to be consumed.Ulcerative
@ChrisLaplante, ok but how the producer knows it is complete ?Hospers
E
7

This becomes a lot easier if you use a thread-safe queue that already has notification and such. The BlockingCollection makes writing producer-consumer stuff really easy.

I like your "linked consumer" idea because you don't have to modify the producer in order to use it. That is, the producer just stuffs things in a queue. How the consumer ultimately uses it is irrelevant. Using BlockingCollection, then, you'd have:

BlockingCollection<ItemType> inputQueue = new BlockingCollection<ItemType>();
BlockingCollection<List<ItemType>> intermediateQueue = new BlockingCollection<List<ItemType>>();

Your producer adds things to the input queue by calling inputQueue.Add. Your intermediate consumer (call it the consolidator) gets things from the queue by calling TryTake with a timeout. For example:

List<ItemType> items = new List<ItemType>();
while (!inputQueue.IsCompleted)
{
    ItemType t;
    while (inputQueue.TryTake(out t, TimeSpan.FromSeconds(5))
    {
        items.Add(t);
    }
    if (items.Count > 0)
    {
        // Add this list of items to the intermediate queue
        intermediateQueue.Add(items);
        items = new List<ItemType>();
    }
}

The second consumer just reads the intermediate queue:

foreach (var itemsList in intermediateQueue.GetConsumingEnumerable))
{
    // do something with the items list
}

No need for ManualResetEvent or lock or any of that; BlockingCollection handles all the messy concurrency stuff for you.

Etesian answered 1/5, 2014 at 16:9 Comment(4)
Thank for the answer, but I need a 3.5 solution and here, as a (little) drawback, we are performing a TryTake every 5 seconds even if nothing is enqueued (that was the point of the ManualResentEvent).Hospers
Thank for the answer, but I need a 3.5 solution and here, as a (little) drawback, we are performing a TryTake every 5 seconds even if nothing is enqueued (that was the point of the ManualResentEvent).Hospers
@Toto: The .NET 3.5 requirement is something that you should add to your question. That's a pretty important requirement. As for the 5 second polling, I'll agree that it's less than ideal, but a five second polling interval isn't going to be a performance bottleneck.Etesian
Requiredment added, but it is still interesing to learn something,even if not used for my specific case :)Hospers
P
1

To expand on the idea of @Chris, when you consume an id, remember what time it is. If more than 5 seconds has passed since the last one, then start a new list an set an event. Your Block Consumer simply waits on that event and consumes the stored list.

Also note that in your first solution, it is possible for ConsumerIdByBlock to exit the inner while just before OnConsumeId acquires the lock, then ConsumerIdByBlock would consume at least one Id too many.

Pelerine answered 1/5, 2014 at 15:27 Comment(0)
P
1

A queue seems the best way of handling what you've described.

thread-safe collections are, unfortunately, a bit of a misnomer. There are "blocking" collections in .NET, but the basic principle of newer classes is to not try to make instance-based classes "thread safe" (static classes is a different story. Class-level "thread safety" is a do-everything-for-everyone proposition. And the second side of that axiomatic coin is "nothing to no one". It can't possibly optimize for a particular application's needs or usages so it ends up taking the worst-case scenario and can't possibly account for all scenarios for all applications to they sometimes miss things. The miss things need to be eventually covered by another means and the interaction of those two means needs to be managed independently.

Queues are a basic reader/writer pattern that can be encapsulated with a locking primitive called reader-writer lock. For which there is a class called ReaderWriterLockSlim that can be used to ensure application-level thread-safety of the use of a queue collection.

Precentor answered 1/5, 2014 at 17:12 Comment(0)
D
1

I would use a System.Timers.Timer. Set a 5000ms interval, and every time a new id is produced, restart the Timer:

   class Consumer
   {
      List<int> _ids = new List<int>();
      Timer producer_timer = new Timer();

      public Consumer()
      {
         producer_timer.Elapsed += ProducerStopped;
         producer_timer.AutoReset = false;
      }

      public void OnConsumeId(int newId)
      {
         lock (_ids)
         {
            _ids.Add(newId);
            producer_timer.Interval = 5000;
            producer_timer.Start();
         }
      }

      public void ProducerStopped(object o, ElapsedEventArgs e)
      {
         // Do job here.
      }
   }
Duckboard answered 17/5, 2014 at 1:2 Comment(1)
Why not a System.Threading.Timer which has a Change methodPhyllode

© 2022 - 2024 — McMap. All rights reserved.