Concurrent collection with priority
Asked Answered
B

3

1

I have a number of threads which retrieve data from the list of servers. The list of servers are downloaded from the server resolver every 5 mins. My threads for processing data should use only the server that has minimal response time. The response time of each server can be significantly different from request to request. So in time frame between updating a list of servers I should verify a response time from each server.

My initial approach was create two additional threads: The first to update a server list, the second to verify a response time from each server and sort a list of servers according to the response time of them.

I tried to use BlockingCollection<T> which was designed to connect producers and consumers, but in my task I have two concurrent consumers and also BlockingCollection<T> doesn't have native ability to insert items for creating a prioritized list of servers.

ConcurrentStack<T> or ConcurrentQueue<T> also cannot be used as is because they are non-blocking like as BlockingCollection<T> and they require additional mechanisms of blocking threads which require items from queues.

Please help me solve this task.

Bernat answered 5/5, 2014 at 10:26 Comment(2)
Keep it simple to start with. Wrap your collection access with another class entirely. Do not allow the rest of your code to directly interact with the collection. In your wrapping class, use basic lock blocks to control the sorting/adding/reading. EDIT: When reading the whole collection, feel free to lock and create a copy of the collection to return to the caller if necessary.Bicentennial
I'm working in this way, but I would like to listen alternative approaches if it's possible. In any way, thanks.Bernat
E
5

With the advent of .NET 6, a new class PriorityQueue<TElement, TPriority> has become available. This is not a thread-safe collection, but nevertheless it can be used quite easily as the backing storage of a IProducerConsumerCollection<T> implementation, which in turn can become the underlying data store of a BlockingCollection<T> class. Below is such a implementation, containing the minimal logic required in order to do the job:

public class ProducerConsumerPriorityQueue<TElement, TPriority>
    : IProducerConsumerCollection<(TElement, TPriority)>
{
    private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
    private long _index = 0;

    public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
    {
        comparer ??= Comparer<TPriority>.Default;
        _queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
        {
            int result = comparer.Compare(x.Item1, y.Item1);
            if (result == 0) result = x.Item2.CompareTo(y.Item2);
            return result;
        }));
    }

    public int Count { get { lock (_queue) return _queue.Count; } }

    public bool TryAdd((TElement, TPriority) item)
    {
        lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
        return true;
    }

    public bool TryTake(out (TElement, TPriority) item)
    {
        lock (_queue)
        {
            if (_queue.TryDequeue(out var element, out var priority))
            {
                item = (element, priority.Item1); return true;
            }
            item = default; return false;
        }
    }

    public bool IsSynchronized => false;

    public object SyncRoot => throw new NotSupportedException();

    public (TElement, TPriority)[] ToArray()
        => throw new NotSupportedException();

    public void CopyTo((TElement, TPriority)[] array, int index)
        => throw new NotSupportedException();

    public void CopyTo(Array array, int index)
        => throw new NotSupportedException();

    public IEnumerator<(TElement, TPriority)> GetEnumerator()
        => throw new NotSupportedException();

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

Only the members Count, TryAdd and TryTake are implemented, but they are enough. They are implemented in a thread-safe way, because it is required by the documentation of the BlockingCollection<T> class:

IProducerConsumerCollection<T> represents a collection that allows for thread-safe adding and removal of data.

Usage example:

var collection = new BlockingCollection<(string Server, int Priority)>(
    new ProducerConsumerPriorityQueue<string, int>());

collection.Add(("Server-A", 20));
collection.Add(("Server-B", 10));
collection.Add(("Server-C", 20));
collection.CompleteAdding();

foreach (var (server, priority) in collection.GetConsumingEnumerable())
{
    Console.WriteLine($"Server: {server}, Priority: {priority}");
}

Output:

Server: Server-B, Priority: 10
Server: Server-A, Priority: 20
Server: Server-C, Priority: 20

Online demo.

The insertion order for items with equal priority is preserved.

Erudition answered 15/3, 2022 at 14:8 Comment(0)
F
1

Consider using two or more BlockingCollections and use the TakeFromAny to listen to all of the queues. The method (although I didn't see it mentioned in the documentation) prefers to take elements from the first queue in the array of queues it is listening to.

Frederickson answered 1/10, 2017 at 8:28 Comment(2)
Do you have any more information about this 'preference'? Will it always take from the first queue if it contains elements?Forint
unfortunately this is only backed up by testing and from browsing the .Net source code. I couldn't find any detailed documentation... I would appreciate if you update me if you find anything contrary.Frederickson
I
0

Unfortunately this is not built in and you might need to implement your own. To help you get started there is already an implementation of this in msdn samples which can be used with BlockingCollection.

http://blogs.msdn.com/pfxteam/archive/2010/04/04/9990342.aspx

For an efficient implementation you need to base it on heap data structure. Here is a nice article though the class does not implement IProducerConsumerCollection:

http://www.codeproject.com/Articles/126751/Priority-queue-in-C-with-the-help-of-heap-data-str

Incomputable answered 5/5, 2014 at 15:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.