Rx extensions: Where is Parallel.ForEach?
Asked Answered
V

3

8

I have a piece of code which is using Parallel.ForEach, probably based on a old version of Rx extensions or the Tasks Parallel Library. I installed a current version of Rx extensions but cannot find Parallel.ForEach. I'm not using any other fancy stuff of the library and just want to process some data in parallel like this:

Parallel.ForEach(records, ProcessRecord);

I found this question, but I would not like to depend on an old versions of Rx. But I was not able to find something similar for Rx, so what's the current and most straight forward way to do that using a current Rx version? The project is using .NET 3.5.

Vinylidene answered 19/12, 2011 at 8:40 Comment(4)
possible duplicate of Parallel.ForEach missing from Reactive Extensions for .Net 3.5Gibrian
@Hasan: I have linked to the question you mention, so I was obviously aware of it. But the answer is proposing to use an old Rx version, which I don't want to use.Vinylidene
@dtb: That's not an option. I'm using only that single snippet which should be easy to be replaced by current Rx code. At least I would expect it to be! ;-) I don't want switch versions just due to this one line of code.Vinylidene
I still think this is the same question, even if the answer wasn't satisfactory. I also think you might be stuck with an old version, upgrading the .Net Framework, or trying to port the Parallel.ForEach method yourself (no idea how feasible this is - I'd personally crack it open in ILSpy and find out before writing it off). Considering the author of that other question didn't post an answer themselves, you could post a bounty on it asking for an answer that doesn't involve using an old version of Rx.Uraeus
S
27

No need to do all this silly goosery if you have Rx:

records.ToObservable()
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .ToList()
    .First();

(Or, if you want the order of the items maintained at the cost of efficiency):

records.ToObservable()
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .Concat()
    .ToList()
    .First();

Or if you want to limit how many items at the same time:

records.ToObservable()
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
    .Merge(5 /* at a time */)
    .ToList()
    .First();
Shama answered 19/12, 2011 at 18:4 Comment(2)
How would you go about throttling this to only processing n items at any given time? I'm trying this with hundreds of items and it's trying to do them all at once which is leading to huge memory issues in my app. I'd like to be able to restrict it to doing 5 at a time say.Mythify
Updated to include limiting concurrencyShama
D
1

Here's a simple replacement:

class Parallel
{
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
    {
        if (source == null)
        {
            throw new ArgumentNullException("source");
        }
        if (body == null)
        {
            throw new ArgumentNullException("body");
        }
        var items = new List<T>(source);
        var countdown = new CountdownEvent(items.Count);
        WaitCallback callback = state =>
        {
            try
            {
                body((T)state);
            }
            finally
            {
                countdown.Signal();
            }
        };
        foreach (var item in items)
        {
            ThreadPool.QueueUserWorkItem(callback, item);
        }
        countdown.Wait();
    }
}
Danziger answered 19/12, 2011 at 8:53 Comment(4)
That looks promissing, but how does the code know when all items have been processed? Parallel.ForEach blocks until all items have been processed, so I don't have to care about that.Vinylidene
It's obviously a simplification of what Parallel.ForEach does. I've expanded my answer to block until all items have been processed.Danziger
It's worth considering just what the trade-off is here compared to upgrading to 4.0. The authors of Parallel.ForEach put a lot of work into getting a good balance of degree of concurrency to amount of overhead. @Danziger on the other hand gave a good answer that takes up about 30LoC. If you find this answer does all you need well, then happy days. If you find yourself tweaking and tweaking and the code here growing to hundreds of LoC, then you've no longer got a good short version, but a bad long version where you're essentially rewriting more and more of what 4.0 Parallel already has tested.Demodulation
Doesn't compile in .NET 3.5 due to the CountdownEvent classOlva
A
0

In case someone find this thread nowdays, the updated version of the answer would be :

records.AsParallel().WithDegreeOfParallelism(5).ForAll(x => ProcessRecord(x));

Avernus answered 7/9, 2022 at 16:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.