Here is an implementation of a IProducerConsumerCollection<T>
collection which is backed by a ConcurrentDictionary<TKey, TValue>
. The T
of the collection is of type KeyValuePair<TKey, TValue>
. It is very similar to Nick Jones's implementation, with some improvements:
public class ConcurrentDictionaryProducerConsumer<TKey, TValue>
: IProducerConsumerCollection<KeyValuePair<TKey, TValue>>
{
private readonly ConcurrentDictionary<TKey, TValue> _dictionary;
private readonly ThreadLocal<IEnumerator<KeyValuePair<TKey, TValue>>> _enumerator;
public ConcurrentDictionaryProducerConsumer(
IEqualityComparer<TKey> comparer = default)
{
_dictionary = new(comparer);
_enumerator = new(() => _dictionary.GetEnumerator());
}
public bool TryAdd(KeyValuePair<TKey, TValue> entry)
{
if (!_dictionary.TryAdd(entry.Key, entry.Value))
throw new DuplicateKeyException();
return true;
}
public bool TryTake(out KeyValuePair<TKey, TValue> entry)
{
// Get a cached enumerator that is used only by the current thread.
IEnumerator<KeyValuePair<TKey, TValue>> enumerator = _enumerator.Value;
while (true)
{
enumerator.Reset();
if (!enumerator.MoveNext())
throw new InvalidOperationException();
entry = enumerator.Current;
if (!_dictionary.TryRemove(entry)) continue;
return true;
}
}
public int Count => _dictionary.Count;
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public KeyValuePair<TKey, TValue>[] ToArray() => _dictionary.ToArray();
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
=> _dictionary.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
=> throw new NotSupportedException();
public void CopyTo(Array array, int index) => throw new NotSupportedException();
}
public class DuplicateKeyException : InvalidOperationException { }
Usage example:
BlockingCollection<KeyValuePair<string, Item>> collection
= new(new ConcurrentDictionaryProducerConsumer<string, Item>());
//...
try { collection.Add(KeyValuePair.Create(key, item)); }
catch (DuplicateKeyException) { Console.WriteLine($"The {key} was rejected."); }
The collection.TryTake
method removes a practically random key from the ConcurrentDictionary
, which is unlikely to be a desirable behavior. Also the performance is not great, and the memory allocations are significant. For these reasons I don't recommend enthusiastically to use the above implementation. I would suggest instead to take a look at the ConcurrentQueueNoDuplicates<T>
that I have posted here, which has a proper queue behavior.
Caution: Calling collection.TryAdd(item);
is not having the expected behavior of returning false
if the key exists. Any attempt to add a duplicate key results invariably in a DuplicateKeyException
. For an explanation look at the aforementioned other post.