How to wait for a single event in C#, with timeout and cancellation
Asked Answered
R

4

22

So my requirement is to have my function wait for the first instance an event Action<T> coming from another class and another thread, and handle it on my thread, allowing the wait to be interrupted by either timeout or CancellationToken.

I want to create a generic function I can reuse. I managed to create a couple options that do (I think) what I need, but both seem more complicated than I'd imagine it should have to be.

Usage

Just to be clear, a sample use of this function would look like this, where serialDevice is spitting out events on a separate thread:

var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
    cancellationToken,
    statusPacket => OnStatusPacketReceived(statusPacket),
    a => serialDevice.StatusPacketReceived += a,
    a => serialDevice.StatusPacketReceived -= a,
    5000,
    () => serialDevice.RequestStatusPacket());

Option 1—ManualResetEventSlim

This option isn't bad, but the Dispose handling of the ManualResetEventSlim is messier than it seems like it should be. It gives ReSharper fits that I'm accessing modified/disposed things within the closure, and it's genuinely hard to follow so I'm not even sure it's correct. Maybe there's something I'm missing that can clean this up, which would be my preference, but I don't see it offhand. Here's the code.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var eventOccurred = false;
    var eventResult = default(TEvent);
    var o = new object();
    var slim = new ManualResetEventSlim();
    Action<TEvent> setResult = result => 
    {
        lock (o) // ensures we get the first event only
        {
            if (!eventOccurred)
            {
                eventResult = result;
                eventOccurred = true;
                // ReSharper disable AccessToModifiedClosure
                // ReSharper disable AccessToDisposedClosure
                if (slim != null)
                {
                    slim.Set();
                }
                // ReSharper restore AccessToDisposedClosure
                // ReSharper restore AccessToModifiedClosure
            }
        }
    };
    subscribe(setResult);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        slim.Wait(msTimeout, token);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(setResult);
        lock(o) // ensure we don't access slim
        {
            slim.Dispose();
            slim = null;
        }
    }
    lock (o) // ensures our variables don't get changed in middle of things
    {
        if (eventOccurred)
        {
            handler(eventResult);
        }
        return eventOccurred;
    }
}

Option 2—polling without a WaitHandle

The WaitForSingleEvent function here is much cleaner. I'm able to use ConcurrentQueue and thus don't even need a lock. But I just don't like the polling function Sleep, and I don't see any way around it with this approach. I'd like to pass in a WaitHandle instead of a Func<bool> to clean up Sleep, but the second I do that I've got the whole Dispose mess to clean up again.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new ConcurrentQueue<TEvent>();
    subscribe(q.Enqueue);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        token.Sleep(msTimeout, () => !q.IsEmpty);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(q.Enqueue);
    }
    TEvent eventResult;
    var eventOccurred = q.TryDequeue(out eventResult);
    if (eventOccurred)
    {
        handler(eventResult);
    }
    return eventOccurred;
}

public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
    var start = DateTime.Now;
    while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
    {
        token.ThrowIfCancellationRequested();
        Thread.Sleep(1);
    }
}

The question

I don't particularly care for either of these solutions, nor am I 100% sure either of them are 100% correct. Is either one of these solutions better than the other (idiomaticity, efficiency, etc), or is there an easier way or built-in function to meet what I need to do here?

Update: Best answer so far

A modification of the TaskCompletionSource solution below. No long closures, locks, or anything required. Seems pretty straightforward. Any errors here?

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var tcs = new TaskCompletionSource<TEvent>();
    Action<TEvent> handler = result => tcs.TrySetResult(result);
    var task = tcs.Task;
    subscribe(handler);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        task.Wait(msTimeout, token);
    }
    finally
    {
        unsubscribe(handler);
        // Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
    }
    if (task.Status == TaskStatus.RanToCompletion)
    {
        onEvent(task.Result);
        return true;
    }
    return false;
}

Update 2: Another great solution

Turns out that BlockingCollection works just like ConcurrentQueue but also has methods accepting a timeout and cancellation token. One nice thing about this solution is that it can be updated to make a WaitForNEvents fairly easily:

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new BlockingCollection<TEvent>();
    Action<TEvent> add = item => q.TryAdd(item);
    subscribe(add);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        TEvent eventResult;
        if (q.TryTake(out eventResult, msTimeout, token))
        {
            handler(eventResult);
            return true;
        }   
        return false;
    }
    finally
    {
        unsubscribe(add);
        q.Dispose();
    }
}
Reprieve answered 14/7, 2013 at 0:30 Comment(7)
It sounds like you want something like AutoResetEvent. Have you looked at the possibility of using it?Aikens
@KendallFrey Yes, that seems to get me into the same Dispose mess that ManualResetEventSlim does, or did you have a way around it?Reprieve
The reason that Resharper complains is because it can't analyze the flow since the subscribe and unsubscribe actions are passed in. It may stop complaining if you passed in the event (via reflection) or somehow made the flow more verifiable. In any case, I personally don't hold Resharper warnings in high regard.Aikens
None of this makes any sense. There is no way that you can intercept an event like DataReceived, fired on a threadpool thread, if you cannot control the event source. Nor does it make any sense to have it fire just once, it only means that calling Read() isn't going block. It makes no promise whatsoever that you get everything you want to read.Conveyor
@HansPassant Can you clarify this? Both of these options have worked without error (so far) when I've been testing them. I don't even have a Read function in the question, so I'm not sure what you mean. serialDevice.RequestStatusPacket() sends the command to my device to respond with a status packet, and I'm needing to wait for the response, which comes on a different thread, with timeout or cancellation, and handle the event on my own thread. Not sure why this doesn't make sense.Reprieve
@Reprieve the updated 'best so far' example is totally great, thank you for that. However, for some reason when I invoke the event, only one waiting thread handler is invoked. (e.g. if I have 20 threads all using the sample you provided hooked to ONE event, I have to get 20 events for each thread to get it's handler invoked).Hedgehog
@Hedgehog static event Action<string> Event;static void Main() {var tokenSrc = new CancellationTokenSource();new Thread(() => {var x = WaitForSingleEvent<string>(tokenSrc.Token,Console.WriteLine,a => Event += a,a => Event -= a,5000);Console.WriteLine(x);}).Start();new Thread(() => {var x = WaitForSingleEvent<string>(tokenSrc.Token,Console.WriteLine,a => Event += a,a => Event -= a,5000);Console.WriteLine(x);}).Start();new Thread(() => {Thread.Sleep(300);Event("Asdf");}).Start();Console.ReadLine();} works for me.Reprieve
C
2

You can use Rx to convert the event to an observable, then to a task, and finally wait on that task with your token/timeout.

One advantage this has over any of the existing solutions, is that it calls unsubscribe on the event's thread, ensuring that your handler won't be called twice. (In your first solution you work around this by tcs.TrySetResult instead of tcs.SetResult, but it's always nice to get rid of a "TryDoSomething" and simply ensure DoSomething always works).

Another advantage is the code's simplicity. It's essentially one line. So you don't even particularly need an independent function. You can inline it so that it's more clear what exactly your code does, and you can make variations on the theme without needing a ton of optional parameters (like your optional initializer, or allow waiting on N events, or foregoing timeouts/cancellation in instances where they're not necessary). And you'd have both the bool return val and the actual result in scope when it's finished, if that's useful at all.

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
    var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
    if (initializer != null) {
        initializer();
    }
    try {
        var finished = task.Wait(msTimeout, token);
        if (finished) onEvent(task.Result);
        return finished;
    } catch (OperationCanceledException) { return false; }
}
Citizenry answered 9/1, 2016 at 1:31 Comment(0)
B
6

You can use TaskCompletetionSource to create a Task that you can mark as completed or cancelled. Here's a possible implementation for a specific event:

public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        target.MyEvent -= handler;
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        target.MyEvent -= handler;
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    target.MyEvent += handler;
    return tcs.Task;
}

In C# 5 you can use it like this:

private async Task MyMethod()
{
    ...
    await WaitFirstMyEvent(foo, cancellationToken);
    ...
}

If you want to wait for the event synchronously, you can also use the Wait method:

private void MyMethod()
{
    ...
    WaitFirstMyEvent(foo, cancellationToken).Wait();
    ...
}

Here's a more generic version, but it still works only for events with Action signature:

public Task WaitFirstEvent(
    Action<Action> subscribe,
    Action<Action> unsubscribe,
    CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        unsubscribe(handler);
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        unsubscribe(handler);
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    subscribe(handler);
    return tcs.Task;
}

You can use it like this:

await WaitFirstEvent(
        handler => foo.MyEvent += handler,
        handler => foo.MyEvent -= handler,
        cancellationToken);

If you want it to work with other event signatures (e.g. EventHandler), you will have to create separate overloads. I don't think there's an easy way to make it work for any signature, especially since the number of parameters isn't always the same.

Blockage answered 14/7, 2013 at 1:2 Comment(4)
I added an update to the question—a possible solution using your examples as a starting point. I don't have any experience with TaskCompletionSource or really Task in general. Do you see any glaring errors in the solution? (It's .Net 4.0 and a desktop app so holding the thread with task.Wait is not an issue.)Reprieve
@lob, well, your code doesn't support cancellation. Also, it doesn't make sense to test the status of the task: if it reaches that point, the status can't be anything but RanToCompletion, otherwise an exception would have bubbled upBlockage
Note I'm using the task.Wait overload that takes a timeout and a CancellationToken. It seems to do the job. If the token is canceled when Wait is called, then it throws OperationCancellationException and if it times out, then the Status remains TaskStatus.WaitingForActivation.Reprieve
@lobsterism, yes, I don't know how I missed the use of the CancellationToken... however you're not catching the exception, so it will just propagate to the caller before you had a chance to check the status of the taskBlockage
C
2

You can use Rx to convert the event to an observable, then to a task, and finally wait on that task with your token/timeout.

One advantage this has over any of the existing solutions, is that it calls unsubscribe on the event's thread, ensuring that your handler won't be called twice. (In your first solution you work around this by tcs.TrySetResult instead of tcs.SetResult, but it's always nice to get rid of a "TryDoSomething" and simply ensure DoSomething always works).

Another advantage is the code's simplicity. It's essentially one line. So you don't even particularly need an independent function. You can inline it so that it's more clear what exactly your code does, and you can make variations on the theme without needing a ton of optional parameters (like your optional initializer, or allow waiting on N events, or foregoing timeouts/cancellation in instances where they're not necessary). And you'd have both the bool return val and the actual result in scope when it's finished, if that's useful at all.

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
    var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
    if (initializer != null) {
        initializer();
    }
    try {
        var finished = task.Wait(msTimeout, token);
        if (finished) onEvent(task.Result);
        return finished;
    } catch (OperationCanceledException) { return false; }
}
Citizenry answered 9/1, 2016 at 1:31 Comment(0)
U
1

Why not just use ManualResetEventSlim.Wait (int millisecondsTimeout, CancellationToken cancellationToken) ?

Ulema answered 31/5, 2019 at 14:7 Comment(0)
D
0

many thanks! for helping other to understand... (maybe showing serialdevice code with hits action handler code)

you could also put a generic type constrain adding something like

 where TEvent : EventArgs

in my case i also need the result out of event in the "waiter"
so i changed signature like
(fast and ugly on a generic object...)

 public static bool WaitForSingleEventWithResult<TEvent, TObjRes>(
            this CancellationToken token,
            Func<TEvent, TObjRes> onEvent,
             ...

calling it in this way

        var ct = new CancellationToken();
        object result;
        bool eventOccurred = ct.WaitForSingleEventWithResult<MyEventArgs, object>(
            onEvent: statusPacket => result = this.OnStatusPacketReceived(statusPacket),
            subscribe: sub => cp.StatusPacketReceived_Action += sub,
            unsubscribe: unsub => cp.StatusPacketReceived_Action -= unsub,
            msTimeout: 5 * 1000,
            initializer: /*() => serialDevice.RequestStatusPacket()*/null);

anyway... many thanks!

Danyelldanyelle answered 7/5, 2019 at 10:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.