I have a concurrent BlockingCollection
with repeated elements. How can modify it to add or get distinct elements?
The default backing store for BlockingCollection
is a ConcurrentQueue
. As somebody else pointed out, it's rather difficult to add distinct items using that.
However, you could create your own collection type that implements IProducerConsumerCollection
, and pass that to the BlockingCollection
constructor.
Imagine a ConcurrentDictionary
that contains the keys of the items that are currently in the queue. To add an item, you call TryAdd
on the dictionary first, and if the item isn't in the dictionary you add it, and also add it to the queue. Take
(and TryTake
) get the next item from the queue, remove it from the dictionary, and return.
I'd prefer if there was a concurrent HashTable
, but since there isn't one, you'll have to do with ConcurrentDictionary
.
Here is an implementation of a IProducerConsumerCollection<T>
collection with the behavior of a queue, that also rejects duplicate items:
public class ConcurrentQueueNoDuplicates<T> : IProducerConsumerCollection<T>
{
private readonly Queue<T> _queue = new();
private readonly HashSet<T> _set;
private object Locker => _queue;
public ConcurrentQueueNoDuplicates(IEqualityComparer<T> comparer = default)
{
_set = new(comparer);
}
public bool TryAdd(T item)
{
lock (Locker)
{
if (!_set.Add(item))
throw new DuplicateKeyException();
_queue.Enqueue(item); return true;
}
}
public bool TryTake(out T item)
{
lock (Locker)
{
if (_queue.Count == 0)
throw new InvalidOperationException();
item = _queue.Dequeue();
bool removed = _set.Remove(item);
Debug.Assert(removed);
return true;
}
}
public int Count { get { lock (Locker) return _queue.Count; } }
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public T[] ToArray() { lock (Locker) return _queue.ToArray(); }
public IEnumerator<T> GetEnumerator() => ToArray().AsEnumerable().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public void CopyTo(T[] array, int index) => throw new NotSupportedException();
public void CopyTo(Array array, int index) => throw new NotSupportedException();
}
public class DuplicateKeyException : InvalidOperationException { }
Usage example:
BlockingCollection<Item> queue = new(new ConcurrentQueueNoDuplicates<Item>());
//...
try { queue.Add(item); }
catch (DuplicateKeyException) { Console.WriteLine($"The {item} was rejected."); }
Caution: Calling queue.TryAdd(item);
is not having the expected behavior of returning false
if the item is a duplicate. Any attempt to add a duplicate item results invariably in a DuplicateKeyException
. Do not attempt to "fix" the above ConcurrentQueueNoDuplicates<T>.TryAdd
implementation, or the TryTake
, by returning false
. The BlockingCollection<T>
will react by throwing a different exception (InvalidOperationException
), and on top of that its internal state will become corrupted. There is currently (.NET 7) a bug that reduces by one the effective capacity of a BlockingCollection<T>
whose underlying storage has a TryAdd
implementation that returns false
. The bug has been fixed for .NET 8, which will prevent the corruption, but it won't change the error-throwing behavior.
© 2022 - 2024 — McMap. All rights reserved.
BlockingCollection
does implementIEnumerable<T>
, so you can useDistinct()
. – Sarina