Parallel.ForEach while retaining order of the results
Asked Answered
C

3

11

I have a List<byte[]> and I like to deserialize each byte[] into Foo. The List is ordered and I like to write a parallel loop in which the resulting List<Foo> contains all Foo in the same order as the original byte[]. The list is significantly large to make parallel operation worthwhile. Is there a built-in way to accomplish this?

If not, any ideas how to achieve a speedup over running this all synchronously?

Conatus answered 20/6, 2012 at 7:49 Comment(11)
post some sample code please and for the record making things parallel doesnt necessarily make faster and efficient... Wonder if Parallel.For<T> would resolve your problem.Trost
This is a duplicate from this : #3640268 So to answer, you could use PLINQ (AsOrdered, AsParallel) to get the job done.Changeover
It would be better to have the parallel equivalent of Select or Map to retain the input order.Misshape
@adt: You gave the answer: Parallel.For solves the problem. Post this as answer. I'll vote +1.Hynes
@adt, if I had code I would not need to ask ;-) I can add a simple for loop if that is what helps you?Conatus
Upvoted your comment had a point :)) but seeing code always easier to seeTrost
@Trost true, in most cases I agree.Conatus
@Kadelka, did not see that other question, thanks, however, I read somewhere that PLinq might not offer any improvement in terms of processing speed when using AsOrdered(). Care to comment?Conatus
@Steven, you make a lot of assumptions. If I had it already I would not have asked, not sure why you think otherwise.Conatus
@Steven, to be honest I fail to actually see how Parallel.For solves the problem. I see how Plinq AsParallel().AsOrdered() gets the job done after Kadelka pointed me to it.Conatus
@Freddy: I think Dr. Adt showed this clearly in his answer. I have nothing more to add.Hynes
R
11

From the info you've given, I understand you want to have an output array of Foo with size equal to the input array of bytes? Is this correct?

If so, yes the operation is simple. Don't bother with locking or synchronized constructs, these will erode all the speed up that parallelization gives you.

Instead, if you obey this simple rule any algorithm can be parallelized without locking or synchronization:

For each input element X[i] processed, you may read from any input element X[j], but only write to output element Y[i]

enter image description here

Look up Scatter/Gather, this type of operation is called a gather as only one output element is written to.

If you can use the above principle then you want to create your output array Foo[] up front, and use Parallel.For not ForEach on the input array.

E.g.

        List<byte[]> inputArray = new List<byte[]>();
        int[] outputArray = new int[inputArray.Count];

        var waitHandle = new ManualResetEvent(false);
        int counter = 0;

        Parallel.For(0, inputArray.Count, index =>
            {
                // Pass index to for loop, do long running operation 
                // on input items
                // writing to only a single output item
                outputArray[index] = DoOperation(inputArray[index]);

                if(Interlocked.Increment(ref counter) == inputArray.Count -1)
                {
                    waitHandle.Set();
                }
            });

        waitHandler.WaitOne();

        // Optional conversion back to list if you wanted this
        var outputList = outputArray.ToList();
Recount answered 20/6, 2012 at 10:7 Comment(3)
Marked your answer as desired solution, though in the end I went with a custom IPropagatorBlock and TPLDataflow through svick's help. Though both, this question and dataflow seem two very different concepts at the surface, parallel processing of data blocks within the IPropagatorBlock vastly outperformed any Parallel.For loop in regards to my specific problem. I found that the synchronization overhead of the mapping and completion notification is very expensive and that TPL Dataflow targets scenarios such as mine. But this in no way discounts your solution to my specific question...Conatus
...which did not include the issue of synchronization and completion notification. It was about Parallel.Foreach while retaining order and your proposed solution fits the bill. Thanks a lot.Conatus
Good point! Task parallel library is a superior solution. I'm surprised it outperforms though. Perhaps its throttling the task to execute more than one serialization per thread? For the above you can include complete notification with a integer counter and Interlocked.Increment(ref counter) to test whether all elements have been processed. Then set a waithandle to proceed.Recount
L
2

You can use a threadsafe dictionary with an index int key to store the reult from foo so at the end you will have all the data orderer in the dictionary

Levina answered 20/6, 2012 at 7:54 Comment(1)
Thanks, am aware of that but it sounds awefully complicated. I basically need another full procedure to wait for the next incoming Foo up next in order, check for synchronicity, and add items in the right order into the resulting list. Looks like quite a bit of overhead that may defeat the whole notion of running this in parallel.Conatus
M
0

It is easier to collect the results in an array instead of a List<Foo>. Assuming that the List<byte[]> is named source, you can do this:

Foo[] output = new Foo[source.Count];

ParallelOptions options = new() { MaxDegreeOfParallelism = Environment.ProcessorCount };

Parallel.ForEach(source, options, (byteArray, state, index) =>
{
    output[index] = Deserialize(byteArray);
});

Notice the absence of any kind of synchronization (lock etc).

The above code works because updating concurrently an array is allowed, as long as each thread updates an exclusive subset of its indices¹. After the completion of the Parallel.ForEach operation, the current thread will see the output array filled with fully initialized Foo instances, without the need to insert manually a memory barrier. The TPL includes automatically memory barriers at the end of task executions (citation), and the Parallel.ForEach is based on Tasks internally (hence the TPL acronym).

Collecting the results directly in a List<Foo> is more involved, because the List<T> collection is explicitly documented as not being thread-safe for concurrent write operations. You can update it safely using the lock statement, as shown below:

List<Foo> output = new(source.Count);
for (int i = 0; i < source.Count; i++) output.Add(default);

Parallel.ForEach(source, options, (byteArray, state, index) =>
{
    Foo foo = Deserialize(byteArray);
    lock (output) output[(int)index] = foo;
});

Notice that the lock protects only the updating of the output list. The Deserialize is not synchronized, otherwise the purpose of parallelization would be defeated.

Starting from .NET 8 it is possible to fill preemptively a List<T> with uninitialized T values without doing a loop, using the advanced CollectionsMarshal.SetCount API:

List<Foo> output = new(source.Count); // Set the Capacity
CollectionsMarshal.SetCount(output, source.Count); // Set the Count

Alternative: It is even simpler if you are willing to switch from the Parallel.ForEach to PLINQ. With a PLINQ query you can collect the results of a parallel operation without relying on side-effects. Just use the ToList or the ToArray operators at the end of the query:

List<Foo> output = source
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .Select(byteArray => Deserialize(byteArray))
    .ToList();

Don't forget to include the AsOrdered operator, otherwise the order will not be preserved.

¹ Not documented explicitly, but generally agreed.

Monica answered 13/10, 2023 at 21:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.