How to access the underlying default concurrent queue of a blocking collection?
Asked Answered
P

4

10

I have multiple producers and a single consumer. However if there is something in the queue that is not yet consumed a producer should not queue it again. (unique no duplicates blocking collection that uses the default concurrent queue)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

However the blocking collection does not have a contains method nor does it provide any kind of TryPeek() like method. How can I access the underlying concurrent queue so I can do something like

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

In a tail spin?

Palembang answered 6/7, 2011 at 22:48 Comment(2)
If there were to be such a method then it would have to be a single atomic method. It could never be implemented as your two excerpts are because anothe thread could add in between TryPeek and Add. I think you are out of luck. Why do you need such a capability anyway?Sideband
in my case it is ok if another thread adds between the trypeek and add. I am mostly doing this as a safety check where a scheduler fires report generation triggers on multiple threads (hence multiple producers). If the scheduler malfunctions for whatever reason and fires the same trigger multiple times in a short span I just wanted to handle. understand I can work around and handle it some way or another. Looks like there is no elegant way of handling this. thank youPalembang
A
9

This is an interesting question. This is the first time I have seen someone ask for a blocking queue that ignores duplicates. Oddly enough I could find nothing like what you want that already exists in the BCL. I say this is odd because BlockingCollection can accept a IProducerConsumerCollection as the underlying collection which has the TryAdd method that is advertised as being able to fail when duplicates are detected. The problem is that I see no concrete implementation of IProducerConsumerCollection that prevents duplicates. At least we can write our own.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

Now that we have our IProducerConsumerCollection that does not accept duplicates we can use it like this:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

You may not like my implementation of NoDuplicatesConcurrentQueue. You are certainly free to implement your own using ConcurrentQueue or whatever if you think you need the low-lock performance that the TPL collections provide.

Update:

I was able to test the code this morning. There is some good news and bad news. The good news is that this will technically work. The bad news is that you probably will not want to do this because BlockingCollection.TryAdd intercepts the return value from the underlying IProducerConsumerCollection.TryAdd method and throws an exception when false is detected. Yep, that is right. It does not return false like you would expect and instead generates an exception. I have to be honest, this is both surprising and ridiculous. The whole point of the TryXXX methods is that they should not throw exceptions. I am deeply disappointed.

Antipodes answered 7/7, 2011 at 1:6 Comment(7)
Brian, think you the man. Let me digest this and I will update tomorrow if I have found my answer. thanksPalembang
The fact that the entire IProducerConsumerCollection needs to be implemented gives me a re-inventing the wheel feeling. There must be a better way. Upvoted your solution since I may end up doing something similar. thank youPalembang
@Gullu: I just updated my answer. I do not think you will want to use this strategy anyway. Read my update.Antipodes
wow. Appreciate your efforts in this regards. Even though I did not find what I was looking for I learned many other things from your post. thanksPalembang
The exception on underlaying collection returning false is described in msdn as "If the item is a duplicate, and the underlying collection does not accept duplicate items, then an InvalidOperationException is thrown.". I believe the logic here is that 'Try' prefix refers to cases that belongs to BlockingCollection itself. Problems with underlaying appears as exceptions.Omarr
@BrianGideon Re: exception. Agree, somewhat disappointing, although exceptions can be caught - incurring a performance penalty, which might be an issue. But this way you can actually distinguish between "bounded producerConsumerCollection was full" and "pcc did not accept item for another reason".Selah
Warning: up to and including .NET 7 the BlockingCollection<T> has a bug that causes corruption of its bounded capacity in case the underlying collection's TryAdd returns false. The code in this answer triggers this bug. The workaround is to throw an exception instead of returning false, as shown here.Readiness
S
6

In addition to the caveat Brian Gideon mentioned after Update, his solution suffers from these performance issues:

  • O(n) operations on the queue (queue.Contains(item)) have a severe impact on performance as the queue grows
  • locks limit concurrency (which he does mention)

The following code improves on Brian's solution by

  • using a hash set to do O(1) lookups
  • combining 2 data structures from the System.Collections.Concurrent namespace

N.B. As there is no ConcurrentHashSet, I'm using a ConcurrentDictionary, ignoring the values.

In this rare case it is luckily possible to simply compose a more complex concurrent data structure out of multiple simpler ones, without adding locks. The order of operations on the 2 concurrent data structures is important here.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }

    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}

N.B. Another way at looking at this problem: You want a set that preserves the insertion order.

Selah answered 6/6, 2014 at 2:48 Comment(1)
Warning: up to and including .NET 7 the BlockingCollection<T> has a bug that causes corruption of its bounded capacity in case the underlying collection's TryAdd returns false. The code in this answer triggers this bug. The workaround is to throw an exception instead of returning false, as shown here.Readiness
W
1

I would suggest implementing your operations with lock so that you don't read and write the item in a way that corrupts it, making them atomic. For example, with any IEnumerable:

object bcLocker = new object();

// ...

lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}
Waylonwayman answered 6/7, 2011 at 23:54 Comment(3)
understand it can be done with locking the old fashioned way. Since I am using the new net 4 classes that for the most part work well without explicit locking I was hoping for something more cool. thanksPalembang
There isn't a "more cool" way -- nothing will work without locking. Even two adds could collide.Waylonwayman
every operation on the collection that could change it needs protecting with this lockevery operation on the collection that could change it needs protecting with this lockSideband
R
0

How to access the underlying default concurrent queue of a blocking collection?

The BlockingCollection<T> is backed by a ConcurrentQueue<T> by default. In other words if you don't specify explicitly its backing storage, it will create a ConcurrentQueue<T> behind the scenes. Since you want to have direct access to the underlying storage, you can create manually a ConcurrentQueue<T> and pass it to the constructor of the BlockingCollection<T>:

ConcurrentQueue<Item> queue = new();
BlockingCollection<Item> collection = new(queue);

Unfortunately the ConcurrentQueue<T> collection doesn't have a TryPeek method with an input parameter, so what you intend to do is not possible:

if (!queue.TryPeek(item)) // Compile error, missing out keyword
    collection.Add(item);

Also be aware that the queue is now owned by the collection. If you attempt to mutate it directly (by issuing Enqueue or TryDequeue commands), the collection will throw exceptions.

Readiness answered 5/2, 2023 at 23:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.