Concurrent collections and unique elements
Asked Answered
D

2

6

I have a concurrent BlockingCollection with repeated elements. How can modify it to add or get distinct elements?

Dumps answered 4/10, 2011 at 18:50 Comment(2)
Adding distinct elements appears to be surprisingly hard if not impossible. BlockingCollection does implement IEnumerable<T>, so you can use Distinct().Sarina
You could use the code provided in the StackOverflow question linked by GertArnold but simply always return true in the TryAdd and TryTake implementation of an IProducerConsumerCollection. This would work. I know it's terrible form but it would work, and the BlockingCollection is already breaking form in that it throws an exception. How terribly annoying.Bobsledding
S
6

The default backing store for BlockingCollection is a ConcurrentQueue. As somebody else pointed out, it's rather difficult to add distinct items using that.

However, you could create your own collection type that implements IProducerConsumerCollection, and pass that to the BlockingCollection constructor.

Imagine a ConcurrentDictionary that contains the keys of the items that are currently in the queue. To add an item, you call TryAdd on the dictionary first, and if the item isn't in the dictionary you add it, and also add it to the queue. Take (and TryTake) get the next item from the queue, remove it from the dictionary, and return.

I'd prefer if there was a concurrent HashTable, but since there isn't one, you'll have to do with ConcurrentDictionary.

Selfrising answered 4/10, 2011 at 21:16 Comment(0)
H
0

Here is an implementation of a IProducerConsumerCollection<T> collection with the behavior of a queue, that also rejects duplicate items:

public class ConcurrentQueueNoDuplicates<T> : IProducerConsumerCollection<T>
{
    private readonly Queue<T> _queue = new();
    private readonly HashSet<T> _set;
    private object Locker => _queue;

    public ConcurrentQueueNoDuplicates(IEqualityComparer<T> comparer = default)
    {
        _set = new(comparer);
    }

    public bool TryAdd(T item)
    {
        lock (Locker)
        {
            if (!_set.Add(item))
                throw new DuplicateKeyException();
            _queue.Enqueue(item); return true;
        }
    }

    public bool TryTake(out T item)
    {
        lock (Locker)
        {
            if (_queue.Count == 0)
                throw new InvalidOperationException();
            item = _queue.Dequeue();
            bool removed = _set.Remove(item);
            Debug.Assert(removed);
            return true;
        }
    }

    public int Count { get { lock (Locker) return _queue.Count; } }
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public T[] ToArray() { lock (Locker) return _queue.ToArray(); }
    public IEnumerator<T> GetEnumerator() => ToArray().AsEnumerable().GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(T[] array, int index) => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}

public class DuplicateKeyException : InvalidOperationException { }

Usage example:

BlockingCollection<Item> queue = new(new ConcurrentQueueNoDuplicates<Item>());

//...

try { queue.Add(item); }
catch (DuplicateKeyException) { Console.WriteLine($"The {item} was rejected."); }

Caution: Calling queue.TryAdd(item); is not having the expected behavior of returning false if the item is a duplicate. Any attempt to add a duplicate item results invariably in a DuplicateKeyException. Do not attempt to "fix" the above ConcurrentQueueNoDuplicates<T>.TryAdd implementation, or the TryTake, by returning false. The BlockingCollection<T> will react by throwing a different exception (InvalidOperationException), and on top of that its internal state will become corrupted. There is currently (.NET 7) a bug that reduces by one the effective capacity of a BlockingCollection<T> whose underlying storage has a TryAdd implementation that returns false. The bug has been fixed for .NET 8, which will prevent the corruption, but it won't change the error-throwing behavior.

Halcyon answered 5/2, 2023 at 18:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.