How can I report progress from a PLINQ query?
Asked Answered
E

2

3

I would like to report progress from a long running PLINQ query.

I can't really find any native LINQ method that allows me to do this (as was implemented for cancellation).

I have read this article that shows a neat extension function for a regular serialized query.

I have been testing the behavior using the below code.

var progress = new BehaviorSubject<int>(0);
DateTime start = DateTime.Now;
progress.Subscribe(x => { Console.WriteLine(x); });
Enumerable.Range(1,1000000)
    //.WithProgressReporting(i => progress.OnNext(i)) //Beginning Progress
    .AsParallel()
    .AsOrdered()
    //.WithProgressReporting(i => progress.OnNext(i)) //Middle Progress reporting
    .Select(v => { Thread.Sleep(1); return v * v; })
    //.WithProgressReporting(i => progress.OnNext(i)) //End Progress Reporting
    .ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");

Edit:
Reporting progress from the middle using the IEnumerable<T> extension removes the parallelism.

Reporting from the end does not report any progress while the parallel calculations are being computed then quickly reports all the progress at the very end. I assume this is progress of compiling of the results from the parallel computation into a list.

I originally thought the progress reporting from beginning was causing the the LINQ to run unparallelized. After sleeping on this, and reading comments from Peter Duniho, I see that it is actually working as parallel, but I am getting so many progress reports that handling so many is causing my test/application to slow significantly.

Is there a way that is parallel/thread safe to report progress from a PLINQ in increments that allow a user to know progress is being made without having a significant impact on the method runtime?

Episcopate answered 9/4, 2019 at 2:2 Comment(1)
Your question is not very clear. Why doesn't the non-parallel WithProgressReporting() method serve your purpose adequately? Typically, you'd be starting with IEnumerable<T> anyway...just wrap your source IEnumerable<T> with the call to WithProgressReporting() and call AsParallel() on that, as you've done in your tests. Ultimately the throughput is going to be same, whether you report progress on the source or result. You need to be more specific: post a minimal reproducible example and explain precisely what output it is you expect, and what you're getting instead.Landloper
N
1

This answer might not be as elegant, but it gets the job done.

When using PLINQ, there are multiple threads processing your collection, so using those threads to report progress results in multiple (and out-of-order) progress reports like 0% 1% 5% 4% 3% etc...

Instead, we can use those multiple threads to update a shared variable storing the progress. In my example, it's a local variable completed. We then spawn another thread using Task.Run() to report on that progress variable at 0.5s intervals.

Extension class:

static class Extensions
    public static ParallelQuery<T> WithProgressReporting<T>(this ParallelQuery<T> sequence, Action increment)
    {
        return sequence.Select(x =>
        {
            increment?.Invoke();
            return x;
        });
    }
}

Code:

static void Main(string[] args)
    {
        long completed = 0;
        Task.Run(() =>
        {
            while (completed < 100000)
            {
                Console.WriteLine((completed * 100 / 100000) + "%");
                Thread.Sleep(500);
            }
        });
        DateTime start = DateTime.Now;
        var output = Enumerable.Range(1, 100000)
            .AsParallel()
            .WithProgressReporting(()=>Interlocked.Increment(ref completed))
            .Select(v => { Thread.Sleep(1); return v * v; })
            .ToList();
        Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
        Console.ReadKey();
    }
Neal answered 9/4, 2019 at 2:58 Comment(0)
F
1

The linked article by Samuel Jack provides two overloads of the WithProgressReporting operator, one with itemCount parameter (second) and one without (first). According to the author's instructions, you have to use the overload that has the itemCount parameter:

The second variant can be used if generating the items in the sequence takes time (so you want to report progress of this part as well), but you know in advance how many there will be: it doesn't attempt to buffer the sequence before passing the items through to the output sequence.

You also have to use the PLINQ operator WithMergeOptions(ParallelMergeOptions.NotBuffered), otherwise the elements produced by the PLINQ query will not be propagated immediately downstream, causing undesirable delays in the progress reporting.

Samuel Jack's implementation reports progress for each and every element produced by the source sequence, which is too much. Below is a more sophisticated implementation, that reports progress at most 101 times, from 0 to 100:

/// <summary>
/// Reports progress 0-100.
/// </summary>
public static IEnumerable<TSource> WithProgressReporting<TSource>(
    this IEnumerable<TSource> source, long itemsCount,
    IProgress<int> progress)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(progress);
    if (itemsCount < 0)
        throw new ArgumentOutOfRangeException(nameof(itemsCount));

    progress.Report(0);
    long i = 0;
    long next = GetNext(0, itemsCount, out _);
    foreach (TSource item in source)
    {
        i++;
        if (i == next)
        {
            next = GetNext(i, itemsCount, out int percentDone);
            progress.Report(percentDone);
        }
        yield return item;
    }
    if (i == 0 || itemsCount == 0) progress.Report(100);

    static long GetNext(long itemsDone, long itemsCount, out int percentDone)
    {
        if (itemsCount == 0) { percentDone = 0; return long.MaxValue; }
        checked
        {
            // Calculate the percent of the work done, rounding down
            percentDone = (int)(itemsDone * 100 / itemsCount);
            // Calculate the next iteration to report, rounding up
            long next = (((percentDone + 1) * itemsCount - 1) / 100) + 1;
            Debug.Assert(next > itemsDone);
            return next;
        }
    }
}

Usage example:

IProgress<int> progress = new Progress<int>(e => Console.WriteLine(e));
long sum = Enumerable.Range(1, 1_000_000)
    .AsParallel()
    .AsOrdered()
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(n =>
    {
        Thread.SpinWait(100); // Simulate some lightweight computation
        return (long)n;
    })
    .AsSequential() // End of parallelism
    .WithProgressReporting(1_000_000, progress) // <--- the extension method
    .Sum();

I've included the AsSequential operator, although it's not needed, to signify visually the completion of the parallel part of the query. Both the WithProgressReporting and the Sum are standard sequential LINQ operators, not parallel.

Online demo.

The GetNext local function does some precise integer divisions, in order to calculate the next iteration that needs to trigger a progress report. I have tested and validated the correctness of the calculations, for all possible itemsCount values in the range 0 - 10,000.

The built-in Progress<T> class invokes asynchronously the handler on the captured synchronization context. In case there is no synchronization context, for example in a console application, the handler is invoked on the ThreadPool. This might be undesirable, because it means that the handler might be invoked after the completion of the PLINQ query. You can solve this problem by using a synchronous IProgress<T> implementation, like the SynchronousProgress<T> that I've posted here. Alternatively you could replace the IProgress<int> progress parameter with an Action<int> reportProgress, as is in Samuel Jack's article.

Frenum answered 9/4, 2019 at 11:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.