How to use IObservable/IObserver with ConcurrentQueue or ConcurrentStack
Asked Answered
B

2

5

I realized that when I am trying to process items in a concurrent queue using multiple threads while multiple threads can be putting items into it, the ideal solution would be to use the Reactive Extensions with the Concurrent data structures.

My original question is at:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

So I am curious if there is any way to have a LINQ (or PLINQ) query that will continuously be dequeueing as items are put into it.

I am trying to get this to work in a way where I can have n number of producers pushing into the queue and a limited number of threads to process, so I don't overload the database.

If I could use Rx framework then I expect that I could just start it, and if 100 items are placed in within 100ms, then the 20 threads that are part of the PLINQ query would just process through the queue.

There are three technologies I am trying to work together:

  1. Rx Framework (Reactive LINQ)
  2. PLING
  3. System.Collections.Concurrent structures
Blink answered 13/6, 2010 at 0:47 Comment(2)
Can you elaborate on how you expected Rx to help you here?Astrict
@Richard Szalay - As I mentioned near the end, my thought is that I don't have to poll to see if anything is in the queue, I could just react when something is placed in there, so if a large number of items are suddenly pushed in I could have several threads doing the processing. I am trying to avoid polling, which is what I am doing right now.Blink
S
6

Drew is right, I think the ConcurrentQueue even though it sounds perfect for the job is actually the underlying data structure that the BlockingCollection uses. Seems very back to front to me too. Check out chapter 7 of this book* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 and it will explain how to use the BlockingCollection and have multiple producers and multiple consumers each taking off the "queue". You will want to look at the "GetConsumingEnumerable()" method and possibly just call .ToObservable() on that.

*the rest of the book is pretty average.

edit:

Here is a sample program that I think does what you want?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
Snitch answered 6/1, 2011 at 13:18 Comment(1)
That's actually.... ingenious man... connecting Rx with a BlockingCollection. Wow.. you can even do a pipeline with this thing: msdn.microsoft.com/en-us/library/ff963548.aspxSeleneselenious
W
3

I don't know how best to accomplish this with Rx, but I would recommend just using BlockingCollection<T> and the producer-consumer pattern. Your main thread adds items into the collection, which uses ConcurrentQueue<T> underneath by default. Then you have a separate Task that you spin up ahead of that which uses Parallel::ForEach over the BlockingCollection<T> to process as many items from the collection as makes sense for the system concurrently. Now, you will probably also want to look into using the GetConsumingPartitioner method of the ParallelExtensions library in order to be most efficient since the default partitioner will create more overhead than you want in this case. You can read more about this from this blog post.

When the main thread is finished you call CompleteAdding on the BlockingCollection<T> and Task::Wait on the Task you spun up to wait for all the consumers to finish processing all the items in the collection.

Wigan answered 1/12, 2010 at 5:26 Comment(1)
The main catch to use BlockingCollection is that the consuming thread blocks. An Observable pattern would only take up the thread when there was something to process.Burly

© 2022 - 2024 — McMap. All rights reserved.