Parallel.ForEach on a BlockingCollection causes steady increase of threads
Asked Answered
M

1

1

I observed a strange behavior while experimenting with the Parallel.ForEach method and the BlockingCollection<T> class. Apparently calling the two lines below on a separate thread, is enough to cause an ever increasing number of threads in the ThreadPool:

var queue = new BlockingCollection<int>();
Parallel.ForEach(queue.GetConsumingEnumerable(), _ => { });

The queue contains no elements. I was expecting that the Parallel.ForEach loop would be idle, waiting for items to be added in the queue. Apparently it's not idle, because the ThreadPool.ThreadCount is increasing by one every second. Here is a minimal demo:

public class Program
{
    public static void Main()
    {
        new Thread(() =>
        {
            var queue = new BlockingCollection<int>();
            Parallel.ForEach(queue.GetConsumingEnumerable(), _ => { });
        })
        { IsBackground = true }.Start();

        Stopwatch stopwatch = Stopwatch.StartNew();
        while (true)
        {
            Console.WriteLine($"ThreadCount: {ThreadPool.ThreadCount}");
            if (stopwatch.ElapsedMilliseconds > 8000) break;
            Thread.Sleep(1000);
        }
        Console.WriteLine("Finished");
    }
}

Output:

ThreadCount: 0
ThreadCount: 4
ThreadCount: 5
ThreadCount: 6
ThreadCount: 7
ThreadCount: 8
ThreadCount: 10
ThreadCount: 11
ThreadCount: 12
Finished

Online demo.

Can anyone explain why is this happening, and how to prevent it from happening? Ideally I would like the Parallel.ForEach to consume at most one ThreadPool thread, while the queue is empty.

I am searching for a solution applicable on .NET Core 3.1 and later.

Making answered 11/2, 2023 at 6:54 Comment(6)
Not really sure. But could it be that Parallel.Foreach spawns a new thread for each time the thread is scheduled on which it has been created? That would explain the observed behavior to me. But why it would do that? For this we would have to deep dive into its implementation, I guess.Adal
On first glance, for a solution, I would say "don't do that", seems to be a bad combo. I would guess a dataflow solution would do a better job, perhaps.Adal
@Adal actually I used this combination yesterday, in this answer, for solving the problem of parallel hierarchical traversal. I was thinking that it was a good quality solution, until I discovered this issue. That answer is susceptible to the problem presented here. In case the root takes a lot of time to be processed, the Parallel.ForEach will be spawning threads every second during this time...Making
It's surprising, to say the least, yes. I personally wouldn't use it after having found what you have found. I just don't know if this is a bug or "works as designed".Adal
@Adal probably it works as designed. As shown in Shingo's answer, if I don't specify the MaxDegreeOfParallelism it's like asking the Parallel.ForEach to saturate my ThreadPool...Making
That's what I thought. And it kind of makes sense. But for this use case it's a bit unfortunate. I personally am not a fan of "unlimited" deg of concurrency, anyway but well... I am not everybody.Adal
S
3

Reason

The Parallel.ForEach method partitions the enumerable first, then it uses TaskReplicator.Run to run the action on each partition.

The actual runner is the method Replica.Execute. In the beginning of the method, because the enumerable is blocked, the replica won't stop, it will create its replica until _remainingConcurrency is 0.

The _remainingConcurrency field is initialed from the ParallelOptions.EffectiveMaxConcurrencyLevel option, its default value is -1, which means unlimited. So you saw the ThreadCount kept increasing.

Solution

Because we know the reason, we just need to specific a reasonable option to limit the iteration count.

var option = new ParallelOptions
{
    MaxDegreeOfParallelism = 5
};
Parallel.ForEach(queue.GetConsumingEnumerable(), option, _ => { });
Semidiurnal answered 11/2, 2023 at 9:34 Comment(3)
Thanks Shingo for the answer. Yes, I thought about specifying the MaxDegreeOfParallelism to reduce the damage, but I was hoping for a solution that would minimize the ThreadPool usage down to one thread. In case a perfect solution is not found, I'll accept your answer.Making
That is about what I have been suspecting. As Theodor said, I would consider setting a max rather a "patch" than a solution. It avoids the worst case of the behavior but it is not exactly what he wants. But I guess that's as good as you can get with that combination if you want to stick to it.Adal
I think you can just use a synchronous foreach for the blockingcollection and create a task for each iteration, according to the code, there is no much difference unless the collection contains a mass of data.Semidiurnal

© 2022 - 2024 — McMap. All rights reserved.