Is there an asynchronous version of PLINQ?
Asked Answered
R

2

15

I want to execute a query over a stream of data while processing items in parallel with a certain degree of parallelism. Normally, I'd use PLINQ for that, but my work items are not CPU bound but IO bound. I want to use async IO. PLINQ does not support async work.

What's the smartest way of running a PLINQ-style query, but with async work items?


Here's a more detailed illustration of the problem:

My goal is to process a potentially infinite stream of "items" in a way that is logically described by the following query:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);

This query is just a sketch of the real query. Now I want each of the placeholder functions to be asynchronous (returning a Task and internally being based on async IO).

Note, that there might be far more items than can be stored in memory. I also must control the degree of parallelism to max out the underlying network and disk hardware.

This question is not about multi-core. It fully applies to machines with only one CPU core because the IO can still benefit from parallelism. Think of slow web-service calls and the like.

Reagan answered 16/1, 2014 at 18:27 Comment(8)
+1, great question. I wonder if you could leverage completion-ports on the IO side to achieve parallelism? Disclaimer: I've used them a lot in C++, but never C#.Lilylivered
You have said that the "work items are not CPU bound but IO bound". Therefore, a large number of cores & CPU parallelism would not be of too much help. I mean, if CPU usage is low and I/O usage is high regarding those 2 operations, then create n = 10 chained actions (ComputeSomeValue, continued by PerformSomeAction) and start the chains sequentially. New task(ComputeSomeValue).ContinueWith(...) etc.Corneous
You can apply that filter you have using PLINQ (CPU parallelism), but that's all... You have to start tasks for the I/O part... IMHO.Corneous
Have you looked at TPL Dataflow? Obviously more verbose than raw PLINQ, but seems to fit exactly what you need. If you're awesome, create a set of LINQ bindings for it so that it can be expressed in LINQ syntax.Illness
Also check out Skeet's blog on Eduasync part 19: ordering by completion, ahead of time...Illness
All placeholder functions in the example should be asynchronous, even the filter and the final action. I also want to keep this question general so that it is applicable to many situations. Please do not assume too much about what the functions do.; I'm ok with starting tasks myself (no other way to do it). But the query pipeline should be managed because that seems pretty hard.Reagan
@usr, to clarify, the placeholders are: Predicate, ComputeSomeValue, PerformSomeAction, they should be async and return Task<X>, right?Hubbub
@Noseratio yes. I hope that this question will give some general insight and be applicable to all problems with similar "pipeline" structure.Reagan
T
6

This sounds like a job for Microsoft's reactive framework.

I started with this code as my initial variables:

var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};

Now, I used regular LINQ query as a base-line:

var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);

This took 50 seconds to compute the following results:

enumerable

Then I switched over to an observable (reactive framework) query:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;

This took 10 seconds to get:

observable

It's clearly computing in parallel.

However, the results are out of order. So I changed the query to this:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);

That still took 10 seconds, but I got the results back in the correct order.

Now, the only issue here is the WithDegreeOfParallelism. There's a coupe of things to try here.

First up I changed the code to produce 10,000 values with a 10ms compute time. My standard LINQ query still took 50 seconds. But the reactive query took 6.3 seconds. If it could perform all the computations at the same time it should have taken much less. This shows that it is maxing out the asynchronous pipeline.

The second point is that the reactive framework uses schedulers for all of the work scheduling. You could try the variety of schedulers that come with the reactive framework to find an alternative if the built-in one doeesn't do what you want. Or you could even write your own scheduler to do whatever scheduling you like.


Here's a version of the query that computes the predicate in parallel too.

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };
Tommi answered 16/1, 2014 at 23:10 Comment(3)
I have not used Rx before. Can the predicate and ComputeSomeValue be made fully async (returning a Task)? In your example that would be Task.Delay.Reagan
You can convert observables to tasks and task to observables using the reactive framework. But in my experience the reactive framework produces far more succinct code than the TPL. I'd suggest doing your computation solely in the reactive framework.Tommi
@Reagan - you can also make the call to Predicate an observable call, making it also run in parallel. Te reactive framework does a nice job of mixing the scheduling of all parts of the query so it is performed efficiently.Tommi
C
1

As stated here, PLINQ is for running LINQ queries in parallel on multi-core/multi-processor systems. It hasn't too much to do about cool systems having a lot of disk units and super networking capabilities. AFAIK, it's made for running executable code on more cores, not for concurrently dispatching multiple I/O requests to the operating system.

Maybe your Predicate(x) is CPU bound, therefore you may perform that filtering operation using PLINQ. But you cannot apply the I/O demanding operations (ComputeSomeValue, PerformSomeAction) in the same way.

What you can do is to define a chain of operations (two in your case) for each item (see continuation tasks) and dispatch that chain (sequentially (?)).

Also, you have mentioned something about an "infinite stream of items". This may sound a bit as the producer-consumer problem - if those items are also I/O generated.

Maybe your problem is not that multi-core friendly... It may be just I/O demanding, that's all...

Corneous answered 16/1, 2014 at 20:56 Comment(2)
CPU load is not significant. But I do require async IO and a very high degree of parallelism to max out the IO. The question fully applies to machines with one CPU core because the IO completions will be multiplexed onto the one core.; I could define a "chain of operators" but there are too many items to kick off all work at once. I need a throttled/guaranteed level of parallelism.Reagan
@Reagan that sounds exactly like TPL DataflowMarlow

© 2022 - 2024 — McMap. All rights reserved.