Async/await tasks and WaitHandle
Asked Answered
P

3

8

Say I have 10N items(I need to fetch them via http protocol), in the code N Tasks are started to get data, each task takes 10 items in sequence. I put the items in a ConcurrentQueue<Item>. After that, the items are processed in a thread-unsafe method one by one.

async Task<Item> GetItemAsync()
{
    //fetch one item from the internet
}

async Task DoWork()
{
    var tasks = new List<Task>();
    var items = new ConcurrentQueue<Item>();
    var handles = new List<ManualResetEvent>();

    for i 1 -> N
    {
        var handle = new ManualResetEvent(false);
        handles.Add(handle);

        tasks.Add(Task.Factory.StartNew(async delegate
        {
            for j 1 -> 10
            {
                var item = await GetItemAsync();
                items.Enqueue(item);
            }
            handle.Set();
        });
    }

    //begin to process the items when any handle is set
    WaitHandle.WaitAny(handles);

    while(true)
    {
         if (all handles are set && items collection is empty) //***
           break; 
         //in another word: all tasks are really completed

         while(items.TryDequeue(out item))          
         {
              AThreadUnsafeMethod(item);    //process items one by one
         }
    }
}

I don't know what if condition can be placed in the statement marked ***. I can't use Task.IsCompleted property here, because I use await in the task, so the task is completed very soon. And a bool[] that indicates whether the task is executed to the end looks really ugly, because I think ManualResetEvent can do the same work. Can anyone give me a suggestion?

Peril answered 24/8, 2012 at 13:15 Comment(0)
I
6

Well, you could build this yourself, but I think it's tons easier with TPL Dataflow.

Something like:

static async Task DoWork()
{
  // By default, ActionBlock uses MaxDegreeOfParallelism == 1,
  //  so AThreadUnsafeMethod is not called in parallel.
  var block = new ActionBlock<Item>(AThreadUnsafeMethod);

  // Start off N tasks, each asynchronously acquiring 10 items.
  // Each item is sent to the block as it is received.
  var tasks = Enumerable.Range(0, N).Select(Task.Run(
      async () =>
      {
        for (int i = 0; i != 10; ++i)
          block.Post(await GetItemAsync());
      })).ToArray();

  // Complete the block when all tasks have completed.
  Task.WhenAll(tasks).ContinueWith(_ => { block.Complete(); });

  // Wait for the block to complete.
  await block.Completion;
}
Incandescence answered 24/8, 2012 at 14:26 Comment(1)
Microsoft TPL Dataflow is now known as System.Threading.Tasks.DataflowRavenna
M
0

You can do a WaitOne with a timeout of zero to check the state. Something like this should work:

if (handles.All(handle => handle.WaitOne(TimeSpan.Zero)) && !items.Any())
    break;

http://msdn.microsoft.com/en-us/library/cc190477.aspx

Mayamayakovski answered 24/8, 2012 at 13:41 Comment(3)
"Blocks the current thread until the current instance receives a signal" so you would have 10 blocking threads waiting for signal.Philippe
@Philippe "Remarks: If timeout is zero, the method does not block. It tests the state of the wait handle and returns immediately."Immodest
@Immodest okay, thats an info I have missed. Thanks, now this is my prefered solution for checking WaitHandles.Philippe
P
0

Thanks all. At last I found CountDownEvent is very suitable for this scenario. The general implementation looks like this:(for others' information)

for i 1 -> N
{
    //start N tasks
    //invoke CountDownEvent.Signal() at the end of each task
}

//see if CountDownEvent.IsSet here
Peril answered 27/8, 2012 at 1:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.