I would like to process some items in parallel. This processing is independent (order does not matter) and returns an output. These outputs should then be relayed back in order as quickly as possible.
That is to say, the method should behave equivalent to this (except calling Process
in parallel):
IEnumerable<T> OrderedParallelImmediateSelect<T> (IEnumerable<object> source)
{
foreach (var input in source) {
var result = Process (input);
yield return result;
}
}
Accordingly, it it required to try to process the items in order. As this is (of course) not guaranteed to finish in order, the result collector must be sure to wait for delayed results.
As soon as the next result in order comes in, it must be returned immediately. We cannot wait for the whole input to be processed before sorting the results.
This is an example of how this could look like:
begin 0
begin 1 <-- we start processing in increasing order
begin 2
complete 1 <-- 1 is complete but we are still waiting for 0
begin 3
complete 0 <-- 0 is complete, so we can return it and 1, too
return 0
return 1
begin 4
begin 5
complete 4 <-- 2 and 3 are missing before we may return this
complete 2 <-- 2 is done, 4 must keep waiting
return 2
begin 6
complete 3 <-- 3 and 4 can now be returned
return 3
return 4
If at all possible, I would like to perform processing on a regular thread pool.
Is this scenario something .NET provides a solution for? I've built a custom solution, but would prefer to use something simpler.
I'm aware of a lot of similar questions, but it seems they all either allow waiting for all items to finish processing or do not guarantee ordered results.
Here's an attempt that sadly does not seem to work. Replacing IEnumerable
with ParallelQuery
had no effect.
int Process (int item)
{
Console.WriteLine ($"+ {item}");
Thread.Sleep (new Random (item).Next (100, 1000));
Console.WriteLine ($"- {item}");
return item;
}
void Output (IEnumerable<int> items)
{
foreach (var it in items) {
Console.WriteLine ($"=> {it}");
}
}
IEnumerable<int> OrderedParallelImmediateSelect (IEnumerable<int> source)
{
// This processes in parallel but does not return the results immediately
return source.AsParallel ().AsOrdered ().Select (Process);
}
var input = Enumerable.Range (0, 20);
Output (OrderedParallelImmediateSelect (input));
Output:
+0 +1 +3 +2 +4 +5 +6 +7 +9 +10 +11 +8 -1 +12 -3 +13 -5 +14 -7 +15 -9 +16 -11 +17 -14 +18 -16 +19 -0 -18 -2 -4 -6 -8 -13 -10 -15 -17 -12 -19 =>0 =>1 =>2 =>3 =>4 =>5 =>6 =>7 =>8 =>9 =>10 =>11 =>12 =>13 =>14 =>15 =>16 =>17 =>18 =>19
var result = source.AsParallel().AsOrdered().Select(item => Process (item));
We addAsOrdered()
to ensure the initial order. – SellerConcurrentQueue
?Parallel.ForEach
?Dataflow
(learn.microsoft.com/en-us/dotnet/standard/parallel-programming/…)? – DuleyWithMergeOptions
. Note the comments on ParallelMergeOptions: " For some queries, such as those involving a sort ... will be ignored. However, queries that are created by using theAsOrdered
operator can be streamed as long as no further sorting is performed within the query itself." – RamadanAsOrdered
will cause a full buffering? I was hoping that would not be the case since I only do aSelect
after. – Gleeman