How to wrap ConcurrentDictionary in BlockingCollection?
Asked Answered
M

3

8

I try to implement a ConcurrentDictionary by wrapping it in a BlockingCollection but did not seem to be successful.

I understand that one variable declarations work with BlockingCollection such as ConcurrentBag<T>, ConcurrentQueue<T>, etc.

So, to create a ConcurrentBag wrapped in a BlockingCollection I would declare and instantiate like this:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());

But how to do it for ConcurrentDictionary? I need the blocking functionality of the BlockingCollection on both the producer and consumer side.

Malarkey answered 24/5, 2012 at 11:2 Comment(4)
Dictionary (and ConcurrentDictionary too) doesn't preserve order of items. Can you describe your producer-consumer scenario?Afc
@Dennis, I am aware of that. A producer stores KeyValuePairs in the concurrentDictionary, and a consumer task increments an int and removes the KeyValuePair if the int matches with the respective key. I do this because worker tasks populate the concurrentDictionary with values but in arbitrary order, the consumer task ensures received values are passed on/worked on in the right order. Can a ConcurrentDictionary be wrapped in a BlockingCollection?Malarkey
What solution did you come up with? I'm trying to find a good solution to a similar problem where the producer doesn't produce items in the order needed by the consumer. (old post I know, but it's worth a try)Marcellamarcelle
Related: Concurrent collections and unique elements.Fineberg
B
5

Maybe you need a concurrent dictionary of blockingCollection

        ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
        int maxBoxes = 5;

        CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
        CancellationToken cancelationToken = cancelationTokenSource.Token;

        Random rnd = new Random();
        // Producer
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // put the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                box.Add("some message " + index, cancelationToken);
                Console.WriteLine("Produced a letter to put in box " + index);

                // Wait simulating a heavy production item.
                Thread.Sleep(1000);
            }
        });

        // Consumer 1
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 1: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        // Consumer 2
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 2: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        Console.ReadLine();
        cancelationTokenSource.Cancel();

By this way, a consumer which is expecting something in the mailbox 5, will wait until the productor puts a letter in the mailbox 5.

Bernstein answered 30/1, 2013 at 14:48 Comment(0)
A
1

You'll need to write your own adapter class - something like:

public class ConcurrentDictionaryWrapper<TKey,TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
{
    private ConcurrentDictionary<TKey, TValue> dictionary;

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return dictionary.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }

    public int Count
    {
        get { return dictionary.Count; }
    }

    public object SyncRoot
    {
        get { return this; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        throw new NotImplementedException();
    }

    public bool TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return dictionary.TryAdd(item.Key, item.Value);
    }

    public bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = dictionary.FirstOrDefault();
        TValue value;
        return dictionary.TryRemove(item.Key, out value);
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        throw new NotImplementedException();
    }
}
Alimentation answered 24/5, 2012 at 13:19 Comment(1)
Thanks for the code suggestion. But my main purpose in using a BlockingCollection was the ability to mark the collection as Adding Completed and to check for the status of it as well as whether its adding complete and empty, similar to what a BlockingCollection provides. I am aware I can easily add such functionality, but I am looking for a suggestion how to do it directly through the BlockingCollection. So far I do not see a reason why it can't work through the Blocking collection directly. Maybe it only takes IProducerConsumerCollection<T>?Malarkey
F
0

Here is an implementation of a IProducerConsumerCollection<T> collection which is backed by a ConcurrentDictionary<TKey, TValue>. The T of the collection is of type KeyValuePair<TKey, TValue>. It is very similar to Nick Jones's implementation, with some improvements:

public class ConcurrentDictionaryProducerConsumer<TKey, TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey, TValue>>
{
    private readonly ConcurrentDictionary<TKey, TValue> _dictionary;
    private readonly ThreadLocal<IEnumerator<KeyValuePair<TKey, TValue>>> _enumerator;

    public ConcurrentDictionaryProducerConsumer(
        IEqualityComparer<TKey> comparer = default)
    {
        _dictionary = new(comparer);
        _enumerator = new(() => _dictionary.GetEnumerator());
    }

    public bool TryAdd(KeyValuePair<TKey, TValue> entry)
    {
        if (!_dictionary.TryAdd(entry.Key, entry.Value))
            throw new DuplicateKeyException();
        return true;
    }

    public bool TryTake(out KeyValuePair<TKey, TValue> entry)
    {
        // Get a cached enumerator that is used only by the current thread.
        IEnumerator<KeyValuePair<TKey, TValue>> enumerator = _enumerator.Value;
        while (true)
        {
            enumerator.Reset();
            if (!enumerator.MoveNext())
                throw new InvalidOperationException();
            entry = enumerator.Current;
            if (!_dictionary.TryRemove(entry)) continue;
            return true;
        }
    }

    public int Count => _dictionary.Count;
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public KeyValuePair<TKey, TValue>[] ToArray() => _dictionary.ToArray();
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
        => _dictionary.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
        => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}

public class DuplicateKeyException : InvalidOperationException { }

Usage example:

BlockingCollection<KeyValuePair<string, Item>> collection
    = new(new ConcurrentDictionaryProducerConsumer<string, Item>());

//...

try { collection.Add(KeyValuePair.Create(key, item)); }
catch (DuplicateKeyException) { Console.WriteLine($"The {key} was rejected."); }

The collection.TryTake method removes a practically random key from the ConcurrentDictionary, which is unlikely to be a desirable behavior. Also the performance is not great, and the memory allocations are significant. For these reasons I don't recommend enthusiastically to use the above implementation. I would suggest instead to take a look at the ConcurrentQueueNoDuplicates<T> that I have posted here, which has a proper queue behavior.

Caution: Calling collection.TryAdd(item); is not having the expected behavior of returning false if the key exists. Any attempt to add a duplicate key results invariably in a DuplicateKeyException. For an explanation look at the aforementioned other post.

Fineberg answered 5/5, 2022 at 13:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.