Parallel ordered consumable
Asked Answered
G

2

2

I would like to process some items in parallel. This processing is independent (order does not matter) and returns an output. These outputs should then be relayed back in order as quickly as possible.

That is to say, the method should behave equivalent to this (except calling Process in parallel):

IEnumerable<T> OrderedParallelImmediateSelect<T> (IEnumerable<object> source)
{
    foreach (var input in source) {
        var result = Process (input);
        yield return result;
    }
}

Accordingly, it it required to try to process the items in order. As this is (of course) not guaranteed to finish in order, the result collector must be sure to wait for delayed results.

As soon as the next result in order comes in, it must be returned immediately. We cannot wait for the whole input to be processed before sorting the results.

This is an example of how this could look like:

begin 0
begin 1     <-- we start processing in increasing order
begin 2
complete 1  <-- 1 is complete but we are still waiting for 0
begin 3
complete 0  <-- 0 is complete, so we can return it and 1, too
return 0
return 1
begin 4
begin 5
complete 4  <-- 2 and 3 are missing before we may return this
complete 2  <-- 2 is done, 4 must keep waiting
return 2
begin 6
complete 3  <-- 3 and 4 can now be returned
return 3
return 4

If at all possible, I would like to perform processing on a regular thread pool.

Is this scenario something .NET provides a solution for? I've built a custom solution, but would prefer to use something simpler.

I'm aware of a lot of similar questions, but it seems they all either allow waiting for all items to finish processing or do not guarantee ordered results.

Here's an attempt that sadly does not seem to work. Replacing IEnumerable with ParallelQuery had no effect.

int Process (int item)
{
    Console.WriteLine ($"+ {item}");
    Thread.Sleep (new Random (item).Next (100, 1000));
    Console.WriteLine ($"- {item}");
    return item;
}
void Output (IEnumerable<int> items)
{
    foreach (var it in items) {
        Console.WriteLine ($"=> {it}");
    }
}

IEnumerable<int> OrderedParallelImmediateSelect (IEnumerable<int> source)
{
    // This processes in parallel but does not return the results immediately
    return source.AsParallel ().AsOrdered ().Select (Process);
}

var input = Enumerable.Range (0, 20);
Output (OrderedParallelImmediateSelect (input));

Output:

+0 +1 +3 +2 +4 +5 +6 +7 +9 +10 +11 +8 -1 +12 -3 +13 -5 +14 -7 +15 -9 +16 -11 +17 -14 +18 -16 +19 -0 -18 -2 -4 -6 -8 -13 -10 -15 -17 -12 -19 =>0 =>1 =>2 =>3 =>4 =>5 =>6 =>7 =>8 =>9 =>10 =>11 =>12 =>13 =>14 =>15 =>16 =>17 =>18 =>19

Gleeman answered 3/9, 2018 at 13:15 Comment(7)
Parallel Linq? var result = source.AsParallel().AsOrdered().Select(item => Process (item)); We add AsOrdered() to ensure the initial order.Seller
ConcurrentQueue? Parallel.ForEach? Dataflow(learn.microsoft.com/en-us/dotnet/standard/parallel-programming/…)?Duley
So the processing is independent, but you can't complete the function until all tasks have been finished? "relayed back in order " - "We cannot wait" How can you sort the results until everything has finished?Kamilahkamillah
@Duley I agree that there likely is a solution, I just have not found one yet. I added one failed attempt with Plink to visualize what I am looking for.Gleeman
PLINQ looks good provided you tell it how to buffer using WithMergeOptions. Note the comments on ParallelMergeOptions: " For some queries, such as those involving a sort ... will be ignored. However, queries that are created by using the AsOrdered operator can be streamed as long as no further sorting is performed within the query itself."Ramadan
@Kamilahkamillah The results will come in in any order, so we need to sort them, of course. But by starting processing in monotonic order, we will most likely get the results in a roughly increasing order, too. And in that case they should be returned immediately.Gleeman
@Ramadan Thanks for highlighting this. Are you saying that the AsOrdered will cause a full buffering? I was hoping that would not be the case since I only do a Select after.Gleeman
R
3

I created this program, as a console application:

using System;
using System.Linq;
using System.Threading;

namespace PlayAreaCSCon
{
    class Program
    {
        static void Main(string[] args)
        {
            var items = Enumerable.Range(0, 1000);
            int prodCount = 0;

            foreach(var item in items.AsParallel()
            .AsOrdered()
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .Select((i) =>
            {
                Thread.Sleep(i % 100);
                Interlocked.Increment(ref prodCount);
                return i;
            }))
            {
                Console.WriteLine(item);
            }
            Console.ReadLine();
        }
    }
}

I then initially set a breakpoint on Console.WriteLine(item);. Running the program, when I first hit that breakpoint, prodCount is 5 - we're definitely consuming results before all processing has completed. And after removing the breakpoint, all results appear to be produced in the original order.

Ramadan answered 3/9, 2018 at 14:18 Comment(1)
I'm glad this actually works. It can take a while for the propagation to occur, so a lot of input is required until it becomes visible that output happens - I failed to notice at first.Gleeman
S
1

The ParallelMergeOptions.NotBuffered disables the buffering of the output, but there is also buffering happening at the other side. The PLINQ employs chunk partitioning by default, which means that the source is enumerated in chunks. This is easy to miss, because the chunks initially have a size of one, and are becoming progressively chunkier as the enumeration unfolds. To remove the buffering at the input side, you must use the EnumerablePartitionerOptions.NoBuffering option:

IEnumerable<int> OrderedParallelImmediateSelect(IEnumerable<int> source)
{
    return Partitioner
        .Create(source, EnumerablePartitionerOptions.NoBuffering)
        .AsParallel()
        .AsOrdered()
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .Select(Process);
}

Something else you might be interested to know is that the current thread participates in the processing of the source, along with ThreadPool threads. So if you have additional work to do during the enumeration of the resulting parallel query, this work will use less than the full power of a thread. It will be like running on a low-priority thread. If you don't want this to happen, you can offload the enumeration of the query to a separate ThreadPool thread, so that the Process runs only on ThreadPool threads, and the current thread is freed and can dedicate itself to the work on the results. There is a custom OffloadEnumeration method in this answer, that could be appended at the end of the query:

//...
.Select(Process)
.OffloadEnumeration();

...or used in the foreach loop:

foreach (var item in OffloadEnumeration(query)) // ...
Stoa answered 7/2, 2023 at 6:38 Comment(1)
Updooted. I cannot test this currently, may switch the checkmark over here laterGleeman

© 2022 - 2024 — McMap. All rights reserved.