How to run LINQ 'let' statements in parallel?
Asked Answered
F

2

13

I have code like this:

var list = new List<int> {1, 2, 3, 4, 5};

var result = from x in list.AsParallel()
             let a = LongRunningCalc1(x)
             let b = LongRunningCalc2(x)
             select new {a, b};

Let's say the LongRunningCalc methods each take 1 second. The code above takes about 2 seconds to run, because while the list of 5 elements is operated on in parallel, the two methods called from the let statements are called sequentially.

However, these methods can safely be called in parallel also. They obviously need to merge back for the select but until then should run in parallel - the select should wait for them.

Is there a way to achieve this?

Figured answered 28/10, 2014 at 17:34 Comment(0)
S
7

You won't be able to use query syntax or the let operation, but you can write a method to perform multiple operations for each item in parallel:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query,
    Func<T, TResult1> selector1,
    Func<T, TResult2> selector2,
    Func<TResult1, TResult2, TFinal> resultAggregator)
{
    return query.Select(item =>
    {
        var result1 = Task.Run(() => selector1(item));
        var result2 = Task.Run(() => selector2(item));
        return resultAggregator(result1.Result, result2.Result);
    });
}

This would allow you to write:

var query = list.AsParallel()
    .SelectAll(LongRunningCalc1, 
        LongRunningCalc2, 
        (a, b) => new {a, b})

You can add overloads for additional parallel operations as well:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal>
    (this ParallelQuery<T> query,
    Func<T, TResult1> selector1,
    Func<T, TResult2> selector2,
    Func<T, TResult3> selector3,
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator)
{
    return query.Select(item =>
    {
        var result1 = Task.Run(() => selector1(item));
        var result2 = Task.Run(() => selector2(item));
        var result3 = Task.Run(() => selector3(item));
        return resultAggregator(
            result1.Result,
            result2.Result,
            result3.Result);
    });
}

It's possible to write a version to handle a number of selectors not known at compile time, but to do that they all need to compute a value of the same type:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query,
    IEnumerable<Func<T, TResult>> selectors)
{
    return query.Select(item => selectors.AsParallel()
            .Select(selector => selector(item))
            .AsEnumerable());
}
public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query,
    params Func<T, TResult>[] selectors)
{
    return SelectAll(query, selectors);
}
Sergias answered 28/10, 2014 at 17:47 Comment(2)
This is not my expert area but dont you need to wait for the results of the tasks?Arose
Yeh calling .Result waits for the Task execution. I oversimplified my sample code a little but this has put me on the right track, thanks for the help.Figured
E
0

I would do this using Microsoft's Reactive Framework ("Rx-Main" in NuGet).

Here it is:

var result =
    from x in list.ToObservable()
    from a in Observable.Start(() => LongRunningCalc1(x))
    from b in Observable.Start(() => LongRunningCalc2(x))
    select new {a, b};

The nice thing is that you can access the results as they are produced using the .Subscribe(...) method:

result.Subscribe(x => /* Do something with x.a and/or x.b */ );

Super simple!

Esse answered 31/10, 2014 at 1:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.