Can Bounded BlockingCollections Lose Data During Adds
Asked Answered
E

1

7

I have a BlockingCollection(ConcurrentBag, 50000) where I am trying to use a very small Bounded Capacity of 50,000 for the producer threads in order to maximize the number of records I can process in my consumer thread's ConcurrentDictionary. The producer is much, much faster than the consumer and will consume most of the memory otherwise.

Unfortunately, I immediately noticed that the total number of records in my ConcurrentDictionary is now substantially lower than it should be after adding the bounded capacity of 50,000 when my test data executes. I read that the BlockingCollection's .add method should block indefinitely until there is space in the collection for the add to execute. However, this does not appear to be the case.

Questions:

  1. Will a BlockingCollection's .add method eventually time out or silently fail if too many add's are called before capacity in the BlockingCollection frees up?

  2. If the answer to #1 is yes, how many adds can I attempt after the Bounding Capacity has been exceeded without losing data?

  3. If many BlockingCollection .add() methods are called which are waiting / blocking for capacity and the CompleteAdding() method is called, will those waiting / blocking adds continue to wait and then eventually add or do they silently fail?

Emanuele answered 31/10, 2012 at 3:48 Comment(3)
You have 50,000 objects in your collection? Wow! Anyway, I would be very surprised if a Blocking Collection failed 'silently', without blocking or throwing, under any operation. If it does, it's hugely broken.Herbie
@Martin, I had up to a max of 7 million in there when I started tracking it prior to adding the Bounded Capacity :) It is a very big process, and the consumer is much faster than the producer.Emanuele
Sounds like you need some flow-control.Herbie
E
11

Make sure that, if you are using a BlockingCollection together with a ConcurrentDictionary that you do not have a BlockingCollection.TryAdd(myobject) method hidden in your code somewhere and are mistaking it for the ConcurrentDictionary.TryAdd() method. The BlockingCollection.TryAdd(myobject) will return false and discard the add request producing a "silent fail", if the BlockingCollection's Bounding Capacity has been exceeded.

  1. The BlockingCollection's .Add() method does not appear to "silenty fail" or lose add's after exceeding the Bounding Capacity by a large amount. The add() method will eventually cause the process to run out of memory, if too many .add()'s are waiting to be added to a BlockingCollection that is over capacity. (This would have to be a very extreme case of flow-control issues)
  2. See #1.
  3. My own tests seem to indicate that once the CompleteAdding() method is called, all subsequent adds fail as described in the MSDN docs.

A Final Note Regarding Performance

It appears that (in my own case anyways) using a Bounding Capacity and .Add() on a BlockingCollection is very slow compared to using no Bounding Capacity and .TryAdd() in the same process.

I achieved much better performance results by implementing my own Bounding Capacity strategy. There are many ways to do this. Three choices include Thread.Sleep(), Thread.Spinwait(), or Monitor.Wait() used together with Monitor.PulseAll(). When one of these strategies are used, it is possible to also use BlockingCollection.TryAdd() instead of BlockingCollection.Add() and have NO Bounding Capacity without losing any data or running out of memory. This method also seems to yield better performance.

You can choose from the three examples based on which scenario works best for the speed differences in your Producer and Consumer threads.

Thread.Wait() Example:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the bounded capacity has been exceeded
    //place the thread in wait mode 
    Thread.Sleep(SleepTime);
}

Thread.SpinWait() Example:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the capacity has been exceeded
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount);
}  

Monitor.Wait() Example

This example requires a hook in both the Producer and Consumer sides.

Producer Code

//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count > 50000)
        {
            Monitor.Wait(syncLock, 1000);
        }
    }
}

Consumer Code

//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count < 40000)
        {
            Monitor.PulseAll(syncLock);
        }
    }
}
Emanuele answered 3/11, 2012 at 19:27 Comment(1)
Not sure if this is still relevant/changed behavior vs 10y ago, but in .NET Core, the BlockingCollection.Add() method blocks the producer until there is capacity (see learn.microsoft.com/en-us/dotnet/standard/collections/…) "if the collection reaches its specified maximum capacity, the producing threads will block until an item is removed" -- this would not cause spurious memory usage imo and the mentioned 'slowness' in this answer would be intentionalMicroscopium

© 2022 - 2024 — McMap. All rights reserved.