Pitfalls of trying to use PLINQ over long-running generators?
Asked Answered
E

3

6

I have a few infinite generator methods, including some long-running and infinitely-long-running generators.

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}

My goal is to have an infinite sequence that combines the items from the two examples. Here's what I tried, using PLINQ:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());

This seems appropriately combines the two IEnumerables into a new one, with items from IEnumerable #1 and #2 being available whenever they appear in either of the two source IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one one one one TWO

However, it seems like sometimes (usually after many hours of running) OtherLongRunningFunction() will go for a long period of time without returning, and under conditions that are hard to reproduce, the combined sequence will block on it rather than continuing to return results from the first LongRunningFunction. It seems that although the combined parallel query started off using two threads, it decided to switch to one thread later on.

My first thought was "this is probably a job for RX Observable.Merge and not for PLINQ." But I'd appreciate both answers that show correct alternative ways to handle this situation as well as explanations about the mechanics of how PLINQ can change the degree of parallelism hours after the the start of a query.

Exorbitance answered 25/1, 2012 at 6:40 Comment(4)
I can't say I've got much experience with PLINQ, so this will mostly be unqualified guessing: It appears as if you assume combining ExampleOne() and ExampleTwo() and running AsParallel() on the resulting IEnumerable will alternate strictly by returning one result from the 1st and one result from the 2nd. Could it be that this assumption is false? If so, you can end up in a situation where the sequence being processed looks something like 1st 2nd 1st 2nd 1st 1st 1st 2nd 1st 2nd 1st 1st 2nd 1st 2nd 2nd 2nd 2nd... And that could explain why it appears as if you're stuck on number two.Hills
@Nailuj It's actually more like I expect 1 1 1 1 1 1 1 2 1 1 1 1 1 1 1 2 etc, 2s are rather rare, and there can be long periods of times between consecutive 2s. I expect the combined sequence to continue returning 1s, and indeed, this is what actually happens for the most part. But sometimes it stops returning 1s as well.Exorbitance
Another thought: could it be that PLINQ starts off by processing an equal number of 1s and 2s in parallel, but since the 1s finish much faster, it will appear as the sequence you described. However, whenever a 1 finished, that "spot" will alternating be filled up by a 1 and a 2 respectively. The result will be that in the beginning, it will appear as if you get many 1s and just the rare 2, but in the long run the "queue" of tasks being processed will be filled up with 2s, and thus appear to be stuck? If not exactly like this, could it be somewhat related? Just trying to think out loud :-)Hills
Did you actually verify this behaviour (add some diagnostics logging during the execution of both long running functions, including the current thread ID)Immemorial
R
2

Here's the Rx way to do it, and indeed, it does use Merge:

IObservable<T> LongRunningFunction()
{
    return Observable.Start(() => {
        // Calculate some stuff
        return blah;
    }, Scheduler.TaskPoolScheduler);
}

Observable.Merge(
    Observable.Defer(LongRunningFunction).Repeat(),
    Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
    Console.WriteLine("An item: {0}", x);
});
Reception answered 25/1, 2012 at 7:25 Comment(0)
M
1

If you want the benefits of TPL especially for tasks with varying loads (what happens when your subscribe blocks, and a number of items have been produced - should you stop yielding items?), I recommend TPL DataFlow.

If you want to do it with Rx, for really long running computational tasks, it's best not to block the thread pool:

var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread));

stream.Subscribe(...);
Mineral answered 25/1, 2012 at 9:7 Comment(2)
If you're blocking while using Rx, you're Doing It Wrong(tm). Monopolizing the Task pool is not as bad as one would think because it is a task-stealing implementation. That being said, if you know you're going to park a bunch of threads on a CPU-intensive workflow, you could do the NewThread thing.Reception
@PaulBetts Well said. You wrote in the TaskPool version, so I put in the thread version. Knowing about the options is always good.Mineral
H
1

Concerning the mechanics of PLINQ:

I encounter the same problem: I have a sequence whose items require uneven processing time, some of which longer by orders of magnitude. I experience thread starvation, much more reproducible on an 8-core processor than on 4-core, although it may happen on a 4-core as well after many hours of processing. Some threads might pick up work again after a while. Note that dynamic chunking is used, as in the example.

Observation: Starvation more probably happens at completion of consecutive very long-running work items.

The MSDN topic Parallel Loops sheds some light:

Be careful if you use parallel loops with individual steps that take several seconds or more. This can occur with I/O-bound workloads as well as lengthy calculations. If the loops take a long time, you may experience an unbounded growth of worker threads due to a heuristic for preventing thread starvation that's used by the .NET ThreadPool class's thread injection logic. This heuristic steadily increases the number of worker threads when work items of the current pool run for long periods of time. The motivation is to add more threads in cases where everything in the thread pool is blocked. Unfortunately, if work is actually proceeding, more threads may not necessarily be what you want. The .NET Framework can't distinguish between these two situations.

I still don't know the details, but i think that the underlying ThreadPool's heuristics don't reason well for very long running work items, failing to accommodate threads for next iterations due to some upper limit not properly adapted, thus having iterations queued. I don't have Visual Studio access to the 8-core machine where the problem is reproducible easier. I have not yet been able to reproduce the problem under Visual Studio debugging on a 4-core machine. Investigation continues.

For more details, the "Does the Task Parallel Library (or PLINQ) take other processes into account?" topic is highly relevant.

Habit answered 27/9, 2013 at 12:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.