How might a class like .NET's ConcurrentBag<T> be implemented?
Asked Answered
M

6

13

I find myself very intrigued by the existence of a ConcurrentBag<T> class in the upcoming .NET 4.0 framework:

Bags are useful for storing objects when ordering doesn't matter, and unlike sets, bags support duplicates.

My question is: how might this idea be implemented? Most collections I'm familiar with essentially amount to (under the hood) some form of array, in which order may not "matter," but there is an order (which is why, even though it doesn't need to, enumeration will pretty much always go through an unchanged collection, be it List, Queue, Stack, etc. in the same sequence).

If I had to guess, I might suggest that internally it could be a Dictionary<T, LinkedList<T>>; but that actually seems quite dubious considering it wouldn't make sense to use just any type T as a key.

What I'm expecting/hoping is that this is actually an established object type that has already been "figured out" somewhere, and that somebody who knows of this established type can tell me about it. It's just so unusual to me--one of those concepts that's easy to understand in real life, but is difficult to translate into a usable class as a developer--which is why I'm curious as to the possibilities.

EDIT:

Some responders have suggested that a Bag could be a form of a hashtable internally. This was my initial thought as well, but I foresaw two problems with this idea:

  1. A hashtable is not all that useful when you don't have a suitable hashcode function for the type in question.
  2. Simply tracking an object's "count" in a collection is not the same as storing the object.

As Meta-Knight suggested, perhaps an example would make this more clear:

public class ExpensiveObject() {
    private ExpensiveObject() {
        // very intense operations happening in here
    }

    public ExpensiveObject CreateExpensiveObject() {
        return new ExpensiveObject();
    }
}

static void Main() {
    var expensiveObjects = new ConcurrentBag<ExpensiveObject>();

    for (int i = 0; i < 5; i++) {
        expensiveObjects.Add(ExpensiveObject.CreateExpensiveObject());
    }

    // after this point in the code, I want to believe I have 5 new
    // expensive objects in my collection

    while (expensiveObjects.Count > 0) {
        ExpensiveObject expObj = null;
        bool objectTaken = expensiveObjects.TryTake(out expObj);
        if (objectTaken) {
            // here I THINK I am queueing a particular operation to be
            // executed on 5 separate threads for 5 separate objects,
            // but if ConcurrentBag is a hashtable then I've just received
            // the object 5 times and so I am working on the same object
            // from 5 threads at the same time!
            ThreadPool.QueueUserWorkItem(DoWorkOnExpensiveObject, expObj);
        } else {
            break;
        }
    }
}

static void DoWorkOnExpensiveObject(object obj) {
    ExpensiveObject expObj = obj as ExpensiveObject;
    if (expObj != null) {
        // some work to be done
    }
}
Modillion answered 6/11, 2009 at 16:50 Comment(4)
+1 as it is good to know the existence of this classPlasma
Dan-o: Your 5-line comment in your example code makes no sense. Of course you have 5 independant objects in the bag at that point.. the "new" operator in [public ExpensiveObject CreateExpensiveObject()] guarantees that.Interlard
@Boogaloo: Take a look at Meta-Knight's and flyfishr64's responses. They are suggesting that a Bag could be implemented as a HashTable with objects as keys and values set to the number of occurrences of the associated key. If this were the case, then "adding" an object would be the same as incrementing this value (the # of occurrences) by one, and "removing" an object would return the object and simply decrement this value. You are right that CreateExpensiveObject would guarantee that an object is created, but not that it is added to the Bag, if the implementation were a HashTable.Modillion
mmm.. my mistake. I have not used hashes in the past.. I had assumed that the default hash generator would create a unique hash value per object - which you could override with your own hash generator. Don't mind me. :)Interlard
S
10

If you look at the details of ConcurrentBag<T>, you'll find that it's, internally, basically a customized linked list.

Since Bags can contain duplicates, and are not accessible by index, a doubly linked list is a very good option for implementation. This allows locking to be fairly fine grained for insert and removal (you don't have to lock the entire collection, just the nodes around where you're inserting/removing). Since you're not worried about duplicates, no hashing is involved. This makes a double linked list perfect.

Saurel answered 6/11, 2009 at 16:56 Comment(5)
Good point: I didn't even think about the fact that a bag does not need to do any matching--it just takes objects and returns them. What seemed to me like a truly confounding problem suddenly seems not so puzzling.Modillion
Since some other responders have different answers (though yours makes the most sense to me), I'd be curious to know where you found these details? Also, that's a great point about only having to lock the nodes where insertion/removal are happening... I'd be curious to know, then, is ConcurrentQueue<T> actually, on the inside, a LinkedList<T> as well? (Otherwise it seems that enqueuing/dequeueing would be needlessly expensive).Modillion
You can use Reflector on System.dll in .NET 4 beta 2 - you'll see the complete implementation of it. It's not actually a LinkedList<T>, but rather an internal ConcurrentBag<T>.ThreadLocalList using a ConcurrentBag<T>.Node - but it's basically a customized double linked list.Saurel
Also, to add, as I understand the implementation keeps a different collection for each thread, and only "steals" an item from another thread if the current thread collection is empty. This "stealing" requires thread synchronization, which brings it's own performance penalty. Hence, ConcurrentBag is efficient mostly if the threads that Add are also the ones that Take, to minimize the performance hit.Blancmange
I was wondering why it was double linked list, and I can see from here referencesource.microsoft.com/#system/sys/system/collections/… that the doubly is used only for ease of adding nodes in the beginning of the list, and also for the "stealing"Blancmange
E
3

There's some good info on ConcurrentBag here: http://geekswithblogs.net/BlackRabbitCoder/archive/2011/03/03/c.net-little-wonders-concurrentbag-and-blockingcollection.aspx

The way that the ConcurrentBag works is to take advantage of the new ThreadLocal type (new in System.Threading for .NET 4.0) so that each thread using the bag has a list local to just that thread.

This means that adding or removing to a thread-local list requires very low synchronization. The problem comes in where a thread goes to consume an item but it’s local list is empty. In this case the bag performs “work-stealing” where it will rob an item from another thread that has items in its list. This requires a higher level of synchronization which adds a bit of overhead to the take operation.

Ewold answered 5/3, 2011 at 14:45 Comment(0)
C
0

Since ordering doesn't matter a ConcurrentBag could be using a hashtable behind the scenes to allow for fast retrieval of data. But unlike a Hashset a bag accepts duplicates. Maybe each item could be paired with a Count property which is set to 1 when an item is added. If you add the same item for a second time, you could just increment the Count property of this item.

Then, to remove an item which has a count greater than one, you could just decrease the Count for this item. If the count was one, you would remove the Item-Count pair from the hashtable.

Commiserate answered 6/11, 2009 at 17:3 Comment(4)
It sounds like you and I had similar ideas, but consider this: first, this would restrict the use of ConcurrentBag<T> to types that are suitable to be used as keys. Second, if you simply have a Count property on each entry in the Hashset, then the object aren't really in the bag, which I feel basically defeats the purpose. (So if I have 10 copies of the same Thing in my ConcurrentBag<Thing>, and I call TryTake 10 times, what gets returned after the first time? The same Thing? Then I think I have 10 objects but really I have just 1.)Modillion
If two items are considered as duplicates in the ConcurrentBag then aren't they the same? If they're the same isn't it expected that if you call TryTake several times you will get the exact same object? I'm not sure how a ConcurrentBag could work well with "types that aren't suitable to be used as keys"...Commiserate
If you had a concrete example it would help me a lot visualize the problem you mention ;-)Commiserate
@Meta-Knight: I added a pretty detailed example to my question. Let me know what you think.Modillion
P
0

Well, in smalltalk (where the notion of a Bag came from), the collection is basically the same as a hash, albeit one that allows duplicates. Instead of storing the duplicate object though, it maintains an "occurrence count", e.g., a refcount of each object. If ConcurrentBag is a faithful implementation, this should give you a starting point.

Papilionaceous answered 6/11, 2009 at 17:3 Comment(0)
F
0

I believe the concept of a 'Bag' is synonymous with 'Multiset'.

There are a number of "Bag"/"Multiset" implementations (these happen to be java) that are open source if you are interested in how they are implemented.

These implementations show that a 'Bag' can be implemented in any number of ways depending on your needs. There are examples of TreeMultiset, HashMultiset, LinkedHashMultiset, ConcurrentHashMultiset.

Google Collections
Google has a number of "MultiSet" implementations, one being a ConcurrentHashMultiset.

Apache Commons
Apache has a number of "Bag" implementations.

Fortyish answered 6/11, 2009 at 17:14 Comment(0)
D
0

The System.Collections.Concurrent namespace is now open source, and the implementation for ConcurrentBag can now be found here:

https://github.com/dotnet/runtime/blob/main/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentBag.cs

Below is the implementation as of Jan 30, 2022. It is MIT licensed.

    // Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace System.Collections.ObjectModel
{
    [Serializable]
    [DebuggerTypeProxy(typeof(CollectionDebugView<>))]
    [DebuggerDisplay("Count = {Count}")]
    [System.Runtime.CompilerServices.TypeForwardedFrom("mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089")]
    public abstract class KeyedCollection<TKey, TItem> : Collection<TItem> where TKey: notnull
    {
        private const int DefaultThreshold = 0;

        private readonly IEqualityComparer<TKey> comparer; // Do not rename (binary serialization)
        private Dictionary<TKey, TItem>? dict; // Do not rename (binary serialization)
        private int keyCount; // Do not rename (binary serialization)
        private readonly int threshold; // Do not rename (binary serialization)

        protected KeyedCollection() : this(null, DefaultThreshold)
        {
        }

        protected KeyedCollection(IEqualityComparer<TKey>? comparer) : this(comparer, DefaultThreshold)
        {
        }

        protected KeyedCollection(IEqualityComparer<TKey>? comparer, int dictionaryCreationThreshold)
            : base(new List<TItem>()) // Be explicit about the use of List<T> so we can foreach over
                                      // Items internally without enumerator allocations.
        {
            if (dictionaryCreationThreshold < -1)
            {
                throw new ArgumentOutOfRangeException(nameof(dictionaryCreationThreshold), SR.ArgumentOutOfRange_InvalidThreshold);
            }

            this.comparer = comparer ?? EqualityComparer<TKey>.Default;
            threshold = dictionaryCreationThreshold == -1 ? int.MaxValue : dictionaryCreationThreshold;
        }

        /// <summary>
        /// Enables the use of foreach internally without allocations using <see cref="List{T}"/>'s struct enumerator.
        /// </summary>
        private new List<TItem> Items
        {
            get
            {
                Debug.Assert(base.Items is List<TItem>);
                return (List<TItem>)base.Items;
            }
        }

        public IEqualityComparer<TKey> Comparer => comparer;

        public TItem this[TKey key]
        {
            get
            {
                TItem item;
                if (TryGetValue(key, out item!))
                {
                    return item;
                }

                throw new KeyNotFoundException(SR.Format(SR.Arg_KeyNotFoundWithKey, key.ToString()));
            }
        }

        public bool Contains(TKey key)
        {
            if (key == null)
            {
                throw new ArgumentNullException(nameof(key));
            }

            if (dict != null)
            {
                return dict.ContainsKey(key);
            }

            foreach (TItem item in Items)
            {
                if (comparer.Equals(GetKeyForItem(item), key))
                {
                    return true;
                }
            }

            return false;
        }

        public bool TryGetValue(TKey key, [MaybeNullWhen(false)] out TItem item)
        {
            if (key == null)
            {
                throw new ArgumentNullException(nameof(key));
            }

            if (dict != null)
            {
                return dict.TryGetValue(key, out item!);
            }

            foreach (TItem itemInItems in Items)
            {
                TKey keyInItems = GetKeyForItem(itemInItems);
                if (keyInItems != null && comparer.Equals(key, keyInItems))
                {
                    item = itemInItems;
                    return true;
                }
            }

            item = default;
            return false;
        }

        private bool ContainsItem(TItem item)
        {
            TKey key;
            if ((dict == null) || ((key = GetKeyForItem(item)) == null))
            {
                return Items.Contains(item);
            }

            TItem itemInDict;
            if (dict.TryGetValue(key, out itemInDict!))
            {
                return EqualityComparer<TItem>.Default.Equals(itemInDict, item);
            }

            return false;
        }

        public bool Remove(TKey key)
        {
            if (key == null)
            {
                throw new ArgumentNullException(nameof(key));
            }

            if (dict != null)
            {
                TItem item;
                return dict.TryGetValue(key, out item!) && Remove(item);
            }

            for (int i = 0; i < Items.Count; i++)
            {
                if (comparer.Equals(GetKeyForItem(Items[i]), key))
                {
                    RemoveItem(i);
                    return true;
                }
            }

            return false;
        }

        protected IDictionary<TKey, TItem>? Dictionary => dict;

        protected void ChangeItemKey(TItem item, TKey newKey)
        {
            if (!ContainsItem(item))
            {
                throw new ArgumentException(SR.Argument_ItemNotExist, nameof(item));
            }

            TKey oldKey = GetKeyForItem(item);
            if (!comparer.Equals(oldKey, newKey))
            {
                if (newKey != null)
                {
                    AddKey(newKey, item);
                }
                if (oldKey != null)
                {
                    RemoveKey(oldKey);
                }
            }
        }

        protected override void ClearItems()
        {
            base.ClearItems();
            dict?.Clear();
            keyCount = 0;
        }

        protected abstract TKey GetKeyForItem(TItem item);

        protected override void InsertItem(int index, TItem item)
        {
            TKey key = GetKeyForItem(item);
            if (key != null)
            {
                AddKey(key, item);
            }

            base.InsertItem(index, item);
        }

        protected override void RemoveItem(int index)
        {
            TKey key = GetKeyForItem(Items[index]);
            if (key != null)
            {
                RemoveKey(key);
            }

            base.RemoveItem(index);
        }

        protected override void SetItem(int index, TItem item)
        {
            TKey newKey = GetKeyForItem(item);
            TKey oldKey = GetKeyForItem(Items[index]);

            if (comparer.Equals(oldKey, newKey))
            {
                if (newKey != null && dict != null)
                {
                    dict[newKey] = item;
                }
            }
            else
            {
                if (newKey != null)
                {
                    AddKey(newKey, item);
                }

                if (oldKey != null)
                {
                    RemoveKey(oldKey);
                }
            }

            base.SetItem(index, item);
        }

        private void AddKey(TKey key, TItem item)
        {
            if (dict != null)
            {
                dict.Add(key, item);
            }
            else if (keyCount == threshold)
            {
                CreateDictionary();
                dict!.Add(key, item);
            }
            else
            {
                if (Contains(key))
                {
                    throw new ArgumentException(SR.Format(SR.Argument_AddingDuplicate, key), nameof(key));
                }

                keyCount++;
            }
        }

        private void CreateDictionary()
        {
            dict = new Dictionary<TKey, TItem>(comparer);
            foreach (TItem item in Items)
            {
                TKey key = GetKeyForItem(item);
                if (key != null)
                {
                    dict.Add(key, item);
                }
            }
        }

        private void RemoveKey(TKey key)
        {
            Debug.Assert(key != null, "key shouldn't be null!");
            if (dict != null)
            {
                dict.Remove(key);
            }
            else
            {
                keyCount--;
            }
        }
    }
}
Dvorak answered 31/1, 2022 at 1:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.