Async Queue Processing With Reactive Extensions
Asked Answered
B

2

7

There are a couple of articles on this, and I have this working...but I want to know how to set a max number of Task threads for my Observable subscriptions at once.

I have the following to parallelize async saving of log entries:

private BlockingCollection<ILogEntry> logEntryQueue;

and

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);

To schedule my saving...but how do I specify the max threads for the scheduler to use at once?

Blader answered 30/6, 2011 at 16:55 Comment(1)
If you're creating schedulers solely to limits on max concurrent threads then consider taking a look at TPL Dataflow. It was built specifically to create pipelines where each block in the pipeline has different limits to concurrency. At least it was more comprehensible and maintainable for me when I prototyped both methods.Capablanca
C
9

This is not a function of the Observable, but a function of the Scheduler. The Observable defines what and the scheduler defines where.

You'd need to pass in a custom scheduler. A simple way to do this would be to subclass TaskScheduler and override the "MaximumConcurrencyLevel" property.

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

I actually found a sample of this on MSDN:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

Edit: You asked about how to go from TaskScheduler to IScheduler. Another developer just gave me that little bit of info:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);
Cordiacordial answered 1/7, 2011 at 19:1 Comment(1)
Awesome, thanks...but how do I get a Reactive IScheduler for my new TaskScheduler? I don't want to re-implement the whole functionality.Blader
M
2

If you create your "work" as IObservable<T> with deferred execution (ie. they want do anything until subscribed to), you can use the Merge overload that accepts a number of maximum concurrent subscriptions:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();

queue
    .Select(item => StartWork(item))
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
    .Subscribe();

// To enqueue:
synchronizedQueue.OnNext(new QueueItem());
Melisma answered 2/7, 2011 at 0:33 Comment(8)
How does this handle other threads adding to the queue while the queue is processing? Would it not suffer from the collection being modified during enumeration?Blader
The Merge operator is thread safe, but I've updated my answer to use a thread-safe subject just in case.Melisma
I guess it's just my confusion here...it's not the thread safety that I'm concerned about, but rather that the queue is being modified (other items are being enqueued) as it's being enumerated... Is this not an issue?Blader
The queue in this example is a reactive "collection", which exists to accept new values over timeMelisma
It looks like Merge requires another IEnumerable to merge with? And Synchronize isn't an available extension method or method on Subject...? Am I doing something wrong?Blader
Synchronize is an extension method on ISubject in the newer builday of Ex. There should also be a merge overload that works as expected. What build are you using?Melisma
Synchronize works, but it returns an IObservable...so OnNext isn't available. Further, I can't find a Merge function that doesn't take another IObservable to merge with?Blader
Richard left out a critical assumption, that your StartWork function has the signature "IObservable<SomeResult> StartWork(QueueItem item)" - the Merge overload you're looking for only works with IObservable<IObservable<T>>Anodic

© 2022 - 2024 — McMap. All rights reserved.