What thread-safe collection use to cache messages
Asked Answered
W

2

1

I'm working on project with following workflow :

  • Background service consumme messages from Rabbitmq's queue
  • Background service use background task queue like this and here to process task paralleling
  • Each task execute queries to retrieve some datas and cache them in collection

If collection size is over 1000 objects, I would like to read collection and then clear it. Like each tasks are processing as parallel, I don't want that another thread add datas in collection until it was cleared.

There are blockingcollection or concurrentdictionary (thread-safe collection), but I don't know which mechanic to use ?

What's the best way to achieve this?

Wherever answered 3/7, 2022 at 13:51 Comment(8)
"I would like to read collection and then clear it." -- What do you mean by that? Do you mean that you want to discard the objects in the collection in case they become more than 1000, and you want to make the consumer of the collection responsible for discarding them?Marga
When the collection contains 1000 objects, I group these objects with an idShipment, and send them to another queue. Why am I doing this ? The messages I consume from RabbitMq contain an IdShipment with others informations. I need to group by idShipment these informations and send them to another queue.Wherever
So by "I would like to read collection and then clear it." you actually mean "I would like to read all the messages that are currently in the collection.", correct?Marga
Yes it's correctWherever
And what should happen in case the collection currently has 999 messages? Would you like to wait until the thousandth message comes, whenever this happen? Or you would also like to consume all the messages even if there are less than 1000 messages, in case a specified amount of time has passed after either (1) the last batch or (2) the first message that was pushed in the collection after the last batch?Marga
It's a very good point ! I didn't think of that. Best solution will be consume all the messages in case a specified amount of time has passed after the first message was pushed in the collection.Wherever
Aha. Here is a relevant question: How to batch a ChannelReader<T>, enforcing a maximum interval policy between consuming and processing any individual item? Do you prefer a collection where messages can be consumed asynchronously, or it's OK to block the thread of the consumer while waiting?Marga
I prefer to consume messages asynchronouslyWherever
M
0

​The collection that seems more suitable for your case is the Channel<T>. This is an asynchronous version of the BlockingCollection<T>, and internally it's based on the same storage (the ConcurrentQueue<T> collection)¹. The similarities are:

  1. They both can be configured to be bounded or unbounded.
  2. A consumer can take a message, even if none is currently available. In this case the Take/ReadAsync call will block either synchronously or asynchronously until a message can be consumed, or the collection completes, whatever comes first.
  3. A producer can push a message, even if the collection is currently full. In this case the Add/WriteAsync call will block either synchronously or asynchronously until there is space available for the message, or the collection completes, whatever comes first.
  4. A consumer can enumerate the collection in a consuming fashion, with a foreach/await foreach loop. Each message received in the loop is consumed by this loop, and will never be available to other consuming loops that might be active by other consumers in parallel.

Some features of the Channel<T> that the BlockingCollection<T> lacks:

  1. A Channel<T> exposes two facades, a Writer and a Reader, that allow a better separation between the roles of the producer and the consumer. In practice this can be more of an annoyance than a useful feature IMHO, but nonetheless it's part of the experience of working with a channel.
  2. A ChannelWriter<T> can be optionally completed with an error. This error is propagated to the consumers of the channel.
  3. A ChannelReader<T> has a Completion property of type Task.
  4. A bounded Channel<T> can be configured to be lossy, so that it drops old buffered messages automatically in order to make space for new incoming messages.

Some features of the BlockingCollection<T> that the Channel<T> lacks:

  1. There is no direct support for timeout when writing/reading messages. This can be achieved indirectly (but precariously, see below) with timer-based CancellationTokenSources.
  2. The contents of a channel cannot be enumerated in a non-consuming fashion.
  3. Some auxiliary features like the BlockingCollection<T>.TakeFromAny method are not available.
  4. A channel cannot be backed by other internal collections, other than the ConcurrentQueue<T>. So it can't have, for example, the behavior of a stack instead of a queue.

Other differences:

  1. A BlockingCollection<T> is created using the constructor of the class. On the contrary a Channel<T> is created using the static factory methods of the Channel class.
  2. The BlockingCollection<T> is not sealed, but it is practically not extendable (only the Dispose method is virtual and protected). It is possible though to inject custom functionality by passing in the constructor a custom IProducerConsumerCollection<T> implementation. On the contrary the Channel<T> is an abstract class, upon which derived classes can be built (if you are brave enough, and ready to handle the complexity of ValueTasks).

Caveat:

There is a nasty memory leak issue that is triggered when a channel is idle (empty with an idle producer, or full with an idle consumer), and at the other end a hyper-active consumer or producer attempts continuously to read/write messages with timer-based CancellationTokenSources. Each such canceled operation leaks about 800 bytes. The leak is resolved automatically when the first read/write operation completes successfully. This issue is known for more than two years, and Microsoft has not decided yet what to do with it.

¹ Actually the only channels that are backed by a ConcurrentQueue<T> are unbounded channels configured with SingleReader = false (the default). Currently there are three built-in implementations, two for unbounded channels (UnboundedChannel<T>, SingleConsumerUnboundedChannel<T>) and one for bounded channels (BoundedChannel<T>).

Marga answered 4/7, 2022 at 11:55 Comment(6)
Thank you for the detailed explanation ! I will try to implement Channel<T> like you say.Wherever
So I implemented my Channel, the messages consumed are added in channel, there is condition if _cache.Reader.Count == 100 for example, I read channel like this : _cache.Reader.ReadAsync(token). It's working but I don't know how to implement an interval between first message and after the last batch ? For example, if after 1min there are new messages, I would like to read channel.Wherever
@JulienMartin check out the method ReadAllBatches that I have posted in this answer. It might be exactly what you want.Marga
Ok nice, I implemented your solution, but if my channel have 0 item, the ReadAsync method block here : item = (await source.ReadAsync(token).ConfigureAwait(false), true);Wherever
The problem is in Consumer_Received event of RabbitMq, I call my producer to write channel and immediately I consumme channel....Maybe I should use a timer for each interval I consumme channel.Wherever
@JulienMartin yes, this is how it works. The ReadAllBatches is supposed to be called by a consumer (a Task) whose sole responsibility is to consume the channel, and process the consumed messages. It can't be used by a worker that has to do also some other work in parallel, except from consuming the channel. If you have some special scenario that is not covered by this simple pattern, I would suggest to post a new question about it.Marga
M
-3

Check out concurrentQueue. It appears to be suitable for the tasks you have mentioned in your questions. Documentation here - https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentqueue-1?view=net-6.0

There are other concurrent collection types as well - https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/

Mesa answered 3/7, 2022 at 14:43 Comment(1)
This isn't a great answer unless you can show how to use the ConcurrentQueue as per the OP's requirements. Answers that rely on links are frowned upon.Chaotic

© 2022 - 2024 — McMap. All rights reserved.