Workaround for the WaitHandle.WaitAll 64 handle limit?
Asked Answered
P

8

55

My application spawns loads of different small worker threads via ThreadPool.QueueUserWorkItem which I keep track of via multiple ManualResetEvent instances. I use the WaitHandle.WaitAll method to block my application from closing until these threads have completed.

I have never had any issues before, however, as my application is coming under more load i.e. more threads being created, I am now beginning to get this exception:

WaitHandles must be less than or equal to 64 - missing documentation

What is the best alternative solution to this?

Code Snippet

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

Workaround

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();
Puerility answered 23/4, 2010 at 23:31 Comment(1)
If and when you move to .NET 4 take a look at CountdownEvent. It wraps up the counting in a tidy package.Va
T
48

Create a variable that keeps track of the number of running tasks:

int numberOfTasks = 100;

Create a signal:

ManualResetEvent signal = new ManualResetEvent(false);

Decrement the number of tasks whenever a task is finished:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

If there is no task remaining, set the signal:

    signal.Set();
}

Meanwhile, somewhere else, wait for the signal to be set:

signal.WaitOne();
Tribe answered 23/4, 2010 at 23:35 Comment(8)
@dtb: So rather than maintaining a list of MRE's just maintain the count and after each thread finishes just decrement (and trigger if count reaches 0)?Puerility
Exactly. MRE's are somewhat heavyweight, so the 64 limit has a reason. Try to avoid too many of them :-)Tribe
Excellent, jst one more thing. I don't exactly know the number of threads the list is built up dynamically is it safe to increment the count inside each new thread before any work is performed and then decrement at the end?Puerility
No don't increment the numberOfTasks in the thread. That could lead to a situation where the numberOfTasks reaches 0 but not all tasks have been started yet. Increment the numberOfTasks right before your call to ThreadPool.QueueUserWorkItem, so it's guaranteed that all tasks have been started before you call signal.WaitOne(). Use Interlocked.Increment to increment the numberOfTasks.Tribe
@dtb: I was worried incase what happens if I increment the numberOfTasks and for (whatever reason) the thread fails to start, this means the signal would never get set or numberOfTasks would never get decremented? Not really sure of the likelyhood of this happening tho....Puerility
Any code in your task (// do work) can throw an exception. So you should make sure that, if an exception is thrown, the numberOfTasks is properly decremented (try ... finally). Don't worry that your UserWorkItem might not be executed - it will. Eventually.Tribe
@dtb: Actually just did that! haha will post the code I intend to use, thanks.Puerility
So increment the numberOftasks before the QueueUserWorkItem, but where would you decrement that? I am guessing inside the thread?Corporeal
K
44

Starting with .NET 4.0, you have two more (and IMO, cleaner) options available to you.

The first is to use the CountdownEvent class. It prevents the need of having to handle the incrementing and decrementing on your own:

int tasks = ...; /* however many tasks you're performing */

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

However, there's an even more robust solution, and that's to use the Task class, like so:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

Using the Task class and the call to WaitAll is much cleaner, IMO, as you're weaving less threading primitives throughout your code (notice, no wait handles); you don't have to set up a counter, handle incrementing/decrementing, you just set up your tasks and then wait on them. This lets the code be more expressive in the what of what you want to do and not the primitives of how (at least, in terms of managing the parallelization of it).

.NET 4.5 offers even more options, you can simplify the generation of the sequence of Task instances by calling the static Run method on the Task class:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

Or, you could take advantage of the TPL DataFlow library (it's in the System namespace, so it's official, even though it's a download from NuGet, like Entity Framework) and use an ActionBlock<TInput>, like so:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

Note that the ActionBlock<TInput> by default processes one item at a time, so if you want to have it process multiple actions at one time, you have to set the number of concurrent items you want to process in the constructor by passing a ExecutionDataflowBlockOptions instance and setting the MaxDegreeOfParallelism property:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

If your action is truly thread safe, then you can set the MaxDegreeOfParallelsim property to DataFlowBlockOptions.Unbounded:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

The point being, you have fine-grained control over how parallel you want your options to be.

Of course, if you have a sequence of items that you want passed into your ActionBlock<TInput> instance, then you can link an ISourceBlock<TOutput> implementation to feed the ActionBlock<TInput>, like so:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

Depending on what you need to do, the TPL Dataflow library becomes a much more attractive option, in that it handles the concurrency across all the tasks linked together, and it allows you to be very specific about just how parallel you want each piece to be, while maintaining proper separation of concerns for each block.

Kentiga answered 14/10, 2012 at 17:17 Comment(0)
M
18

Your workaround is not correct. The reason is that the Set and WaitOne could race if the last work item causes the threadCount to go to zero before the queueing thread has had to chance to queue all work items. The fix is simple. Treat your queueing thread as if it were a work item itself. Initialize threadCount to 1 and do a decrement and signal when the queueing is complete.

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

As a personal preference I like using the CountdownEvent class to do the counting for me.

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 
Mouldon answered 1/10, 2010 at 17:31 Comment(0)
N
6

Adding to dtb's answer you can wrap this into a nice simple class.

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}
Nordrheinwestfalen answered 24/4, 2010 at 1:11 Comment(0)
O
0

Adding to dtb's answer when we want to have callbacks.

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}
Oat answered 24/2, 2016 at 22:14 Comment(1)
please ignore naming convention , its just quick code.Oat
C
0
protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}
Currycomb answered 12/12, 2016 at 12:50 Comment(0)
M
0

I did solved it by simply paginating the number of events to wait without much performace lost, and it's working perfectly on production environment. Follows the code:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }
Medial answered 16/8, 2017 at 20:13 Comment(0)
R
0

Here is another solution. Here is the "events" is a list of ManualResetEvent. The size of the list can be greater than 64 (MAX_EVENTS_NO).

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

Relucent answered 26/9, 2019 at 15:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.