With the advent of .NET 6, a new class PriorityQueue<TElement, TPriority>
has become available. This is not a thread-safe collection, but nevertheless it can be used quite easily as the backing storage of a IProducerConsumerCollection<T>
implementation, which in turn can become the underlying data store of a BlockingCollection<T>
class. Below is such a implementation, containing the minimal logic required in order to do the job:
public class ProducerConsumerPriorityQueue<TElement, TPriority>
: IProducerConsumerCollection<(TElement, TPriority)>
{
private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
private long _index = 0;
public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
{
comparer ??= Comparer<TPriority>.Default;
_queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
{
int result = comparer.Compare(x.Item1, y.Item1);
if (result == 0) result = x.Item2.CompareTo(y.Item2);
return result;
}));
}
public int Count { get { lock (_queue) return _queue.Count; } }
public bool TryAdd((TElement, TPriority) item)
{
lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
return true;
}
public bool TryTake(out (TElement, TPriority) item)
{
lock (_queue)
{
if (_queue.TryDequeue(out var element, out var priority))
{
item = (element, priority.Item1); return true;
}
item = default; return false;
}
}
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public (TElement, TPriority)[] ToArray()
=> throw new NotSupportedException();
public void CopyTo((TElement, TPriority)[] array, int index)
=> throw new NotSupportedException();
public void CopyTo(Array array, int index)
=> throw new NotSupportedException();
public IEnumerator<(TElement, TPriority)> GetEnumerator()
=> throw new NotSupportedException();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
Only the members Count
, TryAdd
and TryTake
are implemented, but they are enough. They are implemented in a thread-safe way, because it is required by the documentation of the BlockingCollection<T>
class:
IProducerConsumerCollection<T>
represents a collection that allows for thread-safe adding and removal of data.
Usage example:
var collection = new BlockingCollection<(string Server, int Priority)>(
new ProducerConsumerPriorityQueue<string, int>());
collection.Add(("Server-A", 20));
collection.Add(("Server-B", 10));
collection.Add(("Server-C", 20));
collection.CompleteAdding();
foreach (var (server, priority) in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Server: {server}, Priority: {priority}");
}
Output:
Server: Server-B, Priority: 10
Server: Server-A, Priority: 20
Server: Server-C, Priority: 20
Online demo.
The insertion order for items with equal priority is preserved.
lock
blocks to control the sorting/adding/reading. EDIT: When reading the whole collection, feel free tolock
and create a copy of the collection to return to the caller if necessary. – Bicentennial