How AsParallel extension actually works
Asked Answered
C

3

8

So topic is the questions.

I get that method AsParallel returns wrapper ParallelQuery<TSource> that uses the same LINQ keywords, but from System.Linq.ParallelEnumerable instead of System.Linq.Enumerable

It's clear enough, but when i'm looking into decompiled sources, i don't understand how does it works.

Let's begin from an easiest extension : Sum() method. Code:

[__DynamicallyInvokable]
public static int Sum(this ParallelQuery<int> source)
{
  if (source == null)
    throw new ArgumentNullException("source");
  else
    return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate();
}

it's clear, let's go to Aggregate() method. It's a wrapper on InternalAggregate method that traps some exceptions. Now let's take a look on it.

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

and here is the question: how it works? I see no concurrence safety for a variable, modified by many threads, we see only iterator and summing. Is it magic enumerator? Or how does it works? GetEnumerator() returns QueryOpeningEnumerator<TOutput>, but it's code is too complicated.

Cityscape answered 22/8, 2013 at 12:59 Comment(0)
C
3

Finally in my second PLINQ assault I found an answer. And it's pretty clear. Problem is that enumerator is not simple. It's a special multithreading one. So how it works? Answer is that enumerator doesn't return a next value of source, it returns a whole sum of next partition. So this code is only executed 2,4,6,8... times (based on Environment.ProcessorCount), when actual summation work is performed inside enumerator.MoveNext in enumerator.OpenQuery method.

So TPL obviosly partition the source enumerable, then sum independently each partition and then pefrorm this summation, see IntSumAggregationOperatorEnumerator<TKey>. No magic here, just could plunge deeper.

Cityscape answered 29/2, 2016 at 13:15 Comment(0)
C
1

The Sum operator aggregates all values in a single thread. There is no multi-threading here. The trick is that multi-threading is happening somewhere else.

The PLINQ Sum method can handle PLINQ enumerables. Those enumerables could be built up using other constructs (such as where) that allows a collection to be processed over multiple threads.

The Sum operator is always the last operator in a chain. Although it is possible to process this sum over multiple threads, the TPL team probably found out that this had a negative impact on performance, which is reasonable, since the only thing this method has to do is a simple integer addition.

So this method processes all results that come available from other threads and processes them on a single thread and returns that value. The real trick is in other PLINQ extension methods.

Chiquita answered 22/8, 2013 at 13:18 Comment(8)
if "There is no multi-threading here" why sum2 is calculating 3 times faster than sum1? i.imgur.com/Z4CtyUz.pngCityscape
@AlexJoukovsky: I tested this and the AsParallel version is indeed faster, but I have no idea how that's possible, especially since the input enumerable is also processed on a single thread at a time. Bizarre.Chiquita
Partial sums could be processed in parallel, then combined.Wendall
@usr: That's true, but as Alex has shown, the current PLINQ Sum implementation doesn't process this in parallel.Chiquita
I don't see Sum1 & Sum2?Buffon
@JeroenvanLangen: Unfortunately Alex deleted his comment. This was his code: var a = Enumerable.Repeat(1, 10000000).ToArray(); var s1 = new Stopwatch(); var s2 = new Stopwatch(); s1.Start(); int sum1 = a.Sum(); s1.Stop(); s2.Start(); int sum2 = a.AsParallel().Sum(); s2.Stop(); Console.WriteLine(s1.Elapsed); Console.WriteLine(s2.Elapsed + " (parallel)");.Chiquita
That benchmark is bound by the single-threaded reading from the sequence. It does not stress the summing operator at all. Try ParallelEnumerable.Wendall
Or does PLINQ specialize for the array? I'd need to step through this in the debugger or profiler to be convinced.Wendall
B
-2
protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?    (ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

This code won't be executed parallel, the while will be sequentially execute it's innerscope.

Try this instead

        List<int> list = new List<int>();

        int num = 0;

        Parallel.ForEach(list, (item) =>
            {
                checked { num += item; }
            });

The inner action will be spread on the ThreadPool and the ForEach statement will be complete when all items are handled.

Here you need threadsafety:

        List<int> list = new List<int>();

        int num = 0;

        Parallel.ForEach(list, (item) =>
            {
                Interlocked.Add(ref num, item);
            });
Buffon answered 22/8, 2013 at 13:18 Comment(6)
Is isn't my code, it's decompiled sources of .Net Framework 4.5, and it worksCityscape
Ofcourse it works, except it isn't multithreaded. It is cast to an IEnumerable? IntSumAggregationOperator((IEnumerable<int>) source So still is executed sequential by the while. I don't think there is any reason to Sum multithreaded, because they all affect the result.Buffon
absence of boxing cannot be reason of 3x perfomance differenceCityscape
The code in this answer is synchronizing the entirety of the work done, so the end result is that it will perform worse than a single threaded solution. The way you get speedup doing a sum in a multi-core environment is partitioning the input into several batches, summing up those batches in parallel, and then combining the results of those batches. Note this can have multiple levels, in which each batch is broken into sub-batches, which are run in parallel, and so on.Fauces
This answer was about multithreading, and the InternalAggregate not handling a ParallelQuery multithreaded. So i gave an example. Ofcourse this will be slower.Buffon
See the bottom of the question.Buffon

© 2022 - 2024 — McMap. All rights reserved.