await async lambda in ActionBlock
Asked Answered
A

1

6

I have a class Receiver with an ActionBlock:

public class Receiver<T> : IReceiver<T>
{

  private ActionBlock<T> _receiver;

  public Task<bool> Send(T item) 
  {
     if(_receiver!=null)
        return _receiver.SendAsync(item);

     //Do some other stuff her
  }

  public void Register (Func<T, Task> receiver)
  {
    _receiver = new ActionBlock<T> (receiver);
  }

  //...
}

The Register-Action for the ActionBlock is a async-Method with a await-Statement:

private static async Task Writer(int num)
{
   Console.WriteLine("start " + num);
   await Task.Delay(500);
   Console.WriteLine("end " + num);
}

Now what i want to do is to wait synchronously (if a condition is set) until the action method is finished to get an exclusive behavior:

var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!

The Problem is when the "await Task.Delay(500);" statement is executed, the "receiver.Post(5).Wait();" does not wait anymore.

I have tried several variants (TaskCompletionSource, ContinueWith, ...) but it does not work.

Has anyone an idea how to solve the problem?

Afforest answered 4/12, 2012 at 13:30 Comment(2)
Couldn't you change your code by changing _receiver into a TransformBlock and put the following action into a new ActionBlock, linked to _receiver?Dor
Can you give me a small code example? I don't understand how that refactoring should solve by "exclusive behavior" problem.Afforest
T
4

ActionBlock by default will enforce exclusive behavior (only one item is processed at a time). If you mean something else by "exclusive behavior", you can use TaskCompletionSource to notify your sender when the action is complete:

... use ActionBlock<Tuple<int, TaskCompletionSource<object>>> and Receiver<Tuple<int, TaskCompletionSource<object>>>
var receiver = new Receiver<Tuple<int, TaskCompletionSource<object>>>();
receiver.Register((Func<Tuple<int, TaskCompletionSource<object>>, Task) Writer);
var tcs = new TaskCompletionSource<object>();
receiver.Send(Tuple.Create(5, tcs));
tcs.Task.Wait(); // if you must

private static async Task Writer(int num, TaskCompletionSource<object> tcs)
{
  Console.WriteLine("start " + num);
  await Task.Delay(500);
  Console.WriteLine("end " + num);
  tcs.SetResult(null);
}

Alternatively, you could use AsyncLock (included in my AsyncEx library):

private static AsyncLock mutex = new AsyncLock();

private static async Task Writer(int num)
{
  using (await mutex.LockAsync())
  {
    Console.WriteLine("start " + num);
    await Task.Delay(500);
    Console.WriteLine("end " + num);
  }
}
Tick answered 4/12, 2012 at 13:50 Comment(2)
yes you are right that the ActionBlock enforces exclusive behavior but if the registered actions are async its not "real exclusive" any more. yes your solution should work but I don't want to add a TaskCompletionSource-parameter because that action is the entry point for the exclusive logic - so if a user does not call tcs.SetResult it does not work anymore ...Afforest
In that case, you could use AsyncLock. See updated answer for a code sample. You no longer know when an item is done processing, but each item will be processed one at a time (including async processing).Tick

© 2022 - 2024 — McMap. All rights reserved.