A reusable pattern to convert event into task
Asked Answered
B

5

21

I'd like to have a generic reusable piece of code for wrapping EAP pattern as task, something similar to what Task.Factory.FromAsync does for BeginXXX/EndXXX APM pattern.

E.g.:

private async void Form1_Load(object sender, EventArgs e)
{
    await TaskExt.FromEvent<EventArgs>(
        handler => this.webBrowser.DocumentCompleted += 
            new WebBrowserDocumentCompletedEventHandler(handler),
        () => this.webBrowser.Navigate("about:blank"),
        handler => this.webBrowser.DocumentCompleted -= 
            new WebBrowserDocumentCompletedEventHandler(handler),
        CancellationToken.None);

    this.webBrowser.Document.InvokeScript("setTimeout", 
        new[] { "document.body.style.backgroundColor = 'yellow'", "1" });
}

So far, it looks like this:

public static class TaskExt
{
    public static async Task<TEventArgs> FromEvent<TEventArgs>(
        Action<EventHandler<TEventArgs>> registerEvent,
        Action action,
        Action<EventHandler<TEventArgs>> unregisterEvent,
        CancellationToken token)
    {
        var tcs = new TaskCompletionSource<TEventArgs>();

        EventHandler<TEventArgs> handler = (sender, args) =>
            tcs.TrySetResult(args);

        registerEvent(handler);
        try
        {
            using (token.Register(() => tcs.SetCanceled()))
            {
                action();
                return await tcs.Task;
            }
        }
        finally
        {
            unregisterEvent(handler);
        }
    }
}

Is it possible to come up with something similar, which nevertheless would not require me to type WebBrowserDocumentCompletedEventHandler twice (for registerEvent/unregisterEvent), without resorting to reflection?

Bakke answered 1/4, 2014 at 10:53 Comment(2)
possible duplicate of General purpose FromEvent methodAdduct
I posted my solution without using any reflection hereOutride
A
12

It is possible with a helper class and a fluent-like syntax:

public static class TaskExt
{
    public static EAPTask<TEventArgs, EventHandler<TEventArgs>> FromEvent<TEventArgs>()
    {
        var tcs = new TaskCompletionSource<TEventArgs>();
        var handler = new EventHandler<TEventArgs>((s, e) => tcs.TrySetResult(e));
        return new EAPTask<TEventArgs, EventHandler<TEventArgs>>(tcs, handler);
    }
}


public sealed class EAPTask<TEventArgs, TEventHandler>
    where TEventHandler : class
{
    private readonly TaskCompletionSource<TEventArgs> _completionSource;
    private readonly TEventHandler _eventHandler;

    public EAPTask(
        TaskCompletionSource<TEventArgs> completionSource,
        TEventHandler eventHandler)
    {
        _completionSource = completionSource;
        _eventHandler = eventHandler;
    }

    public EAPTask<TEventArgs, TOtherEventHandler> WithHandlerConversion<TOtherEventHandler>(
        Converter<TEventHandler, TOtherEventHandler> converter)
        where TOtherEventHandler : class
    {
        return new EAPTask<TEventArgs, TOtherEventHandler>(
            _completionSource, converter(_eventHandler));
    }

    public async Task<TEventArgs> Start(
        Action<TEventHandler> subscribe,
        Action action,
        Action<TEventHandler> unsubscribe,
        CancellationToken cancellationToken)
    {
        subscribe(_eventHandler);
        try
        {
            using(cancellationToken.Register(() => _completionSource.SetCanceled()))
            {
                action();
                return await _completionSource.Task;
            }
        }
        finally
        {
            unsubscribe(_eventHandler);
        }
    }
}

Now you have a WithHandlerConversion helper method, which can infer type parameter from converter argument, which means you need to write WebBrowserDocumentCompletedEventHandler only one time. Usage:

await TaskExt
    .FromEvent<WebBrowserDocumentCompletedEventArgs>()
    .WithHandlerConversion(handler => new WebBrowserDocumentCompletedEventHandler(handler))
    .Start(
        handler => this.webBrowser.DocumentCompleted += handler,
        () => this.webBrowser.Navigate(@"about:blank"),
        handler => this.webBrowser.DocumentCompleted -= handler,
        CancellationToken.None);
Axum answered 1/4, 2014 at 13:6 Comment(0)
B
4

I think the following version might be satisfactory enough. I did borrow the idea of preparing a correctly typed event handler from max's answer, but this implementation doesn't create any additional object explicitly.

As a positive side effect, it allows the caller to cancel or reject the result of the operation (with an exception), based upon the event's arguments (like AsyncCompletedEventArgs.Cancelled, AsyncCompletedEventArgs.Error).

The underlying TaskCompletionSource is still completely hidden from the caller (so it could be replaced with something else, e.g. a custom awaiter or a custom promise):

private async void Form1_Load(object sender, EventArgs e)
{
    await TaskExt.FromEvent<WebBrowserDocumentCompletedEventHandler, EventArgs>(
        getHandler: (completeAction, cancelAction, rejectAction) => 
            (eventSource, eventArgs) => completeAction(eventArgs),
        subscribe: eventHandler => 
            this.webBrowser.DocumentCompleted += eventHandler,
        unsubscribe: eventHandler => 
            this.webBrowser.DocumentCompleted -= eventHandler,
        initiate: (completeAction, cancelAction, rejectAction) =>
            this.webBrowser.Navigate("about:blank"),
        token: CancellationToken.None);

    this.webBrowser.Document.InvokeScript("setTimeout", 
        new[] { "document.body.style.backgroundColor = 'yellow'", "1" });
}

public static class TaskExt
{
    public static async Task<TEventArgs> FromEvent<TEventHandler, TEventArgs>(
        Func<Action<TEventArgs>, Action, Action<Exception>, TEventHandler> getHandler,
        Action<TEventHandler> subscribe,
        Action<TEventHandler> unsubscribe,
        Action<Action<TEventArgs>, Action, Action<Exception>> initiate,
        CancellationToken token = default) where TEventHandler : Delegate
    {
        var tcs = new TaskCompletionSource<TEventArgs>();

        Action<TEventArgs> complete = args => tcs.TrySetResult(args);
        Action cancel = () => tcs.TrySetCanceled();
        Action<Exception> reject = ex => tcs.TrySetException(ex);

        TEventHandler handler = getHandler(complete, cancel, reject);

        subscribe(handler);
        try
        {
            using (token.Register(() => tcs.TrySetCanceled(),
                useSynchronizationContext: false))
            {
                initiate(complete, cancel, reject);
                return await tcs.Task;
            }
        }
        finally
        {
            unsubscribe(handler);
        }
    }
}

This actually can be used to await any callback, not just event handlers, e.g.:
var mre = new ManualResetEvent(false);
RegisteredWaitHandle rwh = null;

await TaskExt.FromEvent<WaitOrTimerCallback, bool>(
    (complete, cancel, reject) => 
        (state, timeout) => { if (!timeout) complete(true); else cancel(); },
    callback => 
        rwh = ThreadPool.RegisterWaitForSingleObject(mre, callback, null, 1000, true),
    callback => 
        rwh.Unregister(mre),
    (complete, cancel, reject) => 
        ThreadPool.QueueUserWorkItem(state => { Thread.Sleep(500); mre.Set(); }),
    CancellationToken.None);

Updated, less boilerplate for a simple event case (I use this one more often these days):

public static async Task<TEventArgs> FromEvent<TEventHandler, TEventArgs>(
    Action<TEventHandler> subscribe,
    Action<TEventHandler> unsubscribe,
    CancellationToken token = default,
    bool runContinuationsAsynchronously = true) 
        where TEventHandler : Delegate
        where TEventArgs: EventArgs
{
    var tcs = new TaskCompletionSource<TEventArgs>(runContinuationsAsynchronously ?
        TaskCreationOptions.RunContinuationsAsynchronously :
        TaskCreationOptions.None);

    var handler = new Action<object?, TEventArgs>((_, args) => tcs.TrySetResult(args)); 
    var h = (TEventHandler)Delegate.CreateDelegate(typeof(TEventHandler), handler.Target, handler.Method);

    subscribe(h);
    try
    {
        using (token.Register(() => tcs.TrySetCanceled(), useSynchronizationContext: false))
        {
            return await tcs.Task;
        }
    }
    finally
    {
        unsubscribe(h);
    }
}

Usage:

await TaskExt.FromEvent<FormClosedEventHandler, FormClosedEventArgs>(
    h => mainForm.FormClosed += h,
    h => mainForm.FormClosed -= h,
    token);
Bakke answered 1/4, 2014 at 23:24 Comment(0)
I
4

I have a (usage wise) much shorter Solution. I will show you the usage first and then give you the code that makes this happen (use it freely).
usage eg:

await button.EventAsync(nameof(button.Click));

or:

var specialEventArgs = await busniessObject.EventAsync(nameof(busniessObject.CustomerCreated));

or for Events that need to be triggered in some way:

var serviceResult = await service.EventAsync(()=> service.Start, nameof(service.Completed));

the magic that makes this happen (beware it's C# 7.1 syntax but can easily be converted back to lower language versions by adding a few lines):

using System;
using System.Threading;
using System.Threading.Tasks;

namespace SpacemonsterIndustries.Core
{
    public static class EventExtensions
    {
        /// <summary>
        /// Extension Method that converts a typical EventArgs Event into an awaitable Task 
        /// </summary>
        /// <typeparam name="TEventArgs">The type of the EventArgs (must inherit from EventArgs)</typeparam>
        /// <param name="objectWithEvent">the object that has the event</param>
        /// <param name="trigger">optional Function that triggers the event</param>
        /// <param name="eventName">the name of the event -> use nameof to be safe, e.g. nameof(button.Click) </param>
        /// <param name="ct">an optional Cancellation Token</param>
        /// <returns></returns>
        public static async Task<TEventArgs> EventAsync<TEventArgs>(this object objectWithEvent, Action trigger, string eventName, CancellationToken ct = default)
            where TEventArgs : EventArgs
        {
            var completionSource = new TaskCompletionSource<TEventArgs>(ct);
            var eventInfo = objectWithEvent.GetType().GetEvent(eventName);
            var delegateDef = new UniversalEventDelegate<TEventArgs>(Handler);
            var handlerAsDelegate = Delegate.CreateDelegate(eventInfo.EventHandlerType, delegateDef.Target, delegateDef.Method);

            eventInfo.AddEventHandler(objectWithEvent, handlerAsDelegate);

            trigger?.Invoke();

            var result = await completionSource.Task;

            eventInfo.RemoveEventHandler(objectWithEvent, handlerAsDelegate); 

            return result;

            void Handler(object sender, TEventArgs e) => completionSource.SetResult(e);
        }

        public static Task<TEventArgs> EventAsync<TEventArgs>(this object objectWithEvent, string eventName, CancellationToken ct = default) where TEventArgs : EventArgs
            => EventAsync<TEventArgs>(objectWithEvent, null, eventName, ct);

        private delegate void UniversalEventDelegate<in TEventArgs>(object sender, TEventArgs e) where TEventArgs : EventArgs;
    }
}
Irresolution answered 19/7, 2017 at 11:0 Comment(3)
+ for the efforts, indeed this might be more simple usage-wise, but I don't like loosing type safety to refelection (i.e., object objectWithEvent and string eventName).Bakke
Extensions to object should be avoided, plain static methods would do the job, too.Epiglottis
Downvoted for excessive var usageWalker
B
3

Converting from EAP to Tasks is not that straightforward, mainly because you have to handle exceptions both when calling the long-running method and when handling the event.

The ParallelExtensionsExtras library contains the EAPCommon.HandleCompletion(TaskCompletionSource tcs, AsyncCompletedEventArgs e, Func getResult, Action unregisterHandler) extension method to make the conversion easier. The method handles subscribing/unsubscribing from an event. It doesn't try to start the long running operation as well

Using this method, the library implements asynchronous versions of SmtpClient, WebClient and PingClient.

The following method shows the general usage pattern:

    private static Task<PingReply> SendTaskCore(Ping ping, object userToken, Action<TaskCompletionSource<PingReply>> sendAsync) 
    { 
        // Validate we're being used with a real smtpClient.  The rest of the arg validation 
        // will happen in the call to sendAsync. 
        if (ping == null) throw new ArgumentNullException("ping"); 

        // Create a TaskCompletionSource to represent the operation 
        var tcs = new TaskCompletionSource<PingReply>(userToken); 

        // Register a handler that will transfer completion results to the TCS Task 
        PingCompletedEventHandler handler = null; 
        handler = (sender, e) => EAPCommon.HandleCompletion(tcs, e, () => e.Reply, () => ping.PingCompleted -= handler); 
        ping.PingCompleted += handler; 

        // Try to start the async operation.  If starting it fails (due to parameter validation) 
        // unregister the handler before allowing the exception to propagate. 
        try 
        { 
            sendAsync(tcs); 
        } 
        catch(Exception exc) 
        { 
            ping.PingCompleted -= handler; 
            tcs.TrySetException(exc); 
        } 

        // Return the task to represent the asynchronous operation 
        return tcs.Task; 
    } 

The main difference from your code is here:

// Register a handler that will transfer completion results to the TCS Task 
PingCompletedEventHandler handler = null; 
handler = (sender, e) => EAPCommon.HandleCompletion(tcs, e, () => e.Reply, 
          () => ping.PingCompleted -= handler); 
ping.PingCompleted += handler; 

The extension method creates the handler and hooks the tcs. Your code sets the handler to the source object and starts the long operation. The actual handler type doesn't leak outside the method.

By separating the two concerns (handling the event vs starting the operation) it's easier to create a generic method.

Broddie answered 1/4, 2014 at 11:21 Comment(2)
This is interesting, thanks. AFAIK, ParallelExtensionsExtras was created before availability of async/await (which I'm taking advantage of for scoping the operation and propagating exceptions). This implementation doesn't leak handler outside indeed, but type PingCompletedEventHandler is hard-coded inside. I'm trying to avoid this to make it generic so it can be used with any events and event sources. I can't see how to make it work without leaking the handler, although I'd love to.Bakke
BTW, with async/await I don't need to handle exceptions inside the event handler. All the handler does is tcs.SetResult. If the continuation code throws, the exception won't be propagated to the the caller of tcs.SetResult, i.e. to the event source. This is the desired behavior.Bakke
L
1

Here is a solution that makes minimal use of reflection, inspired from the Observable.FromEvent method (Reactive Extensions).

public static Task<TEventArgs> TaskFromEvent<TDelegate, TEventArgs>(
    Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
    where TDelegate : Delegate where TEventArgs : EventArgs
{
    var tcs = new TaskCompletionSource<TEventArgs>();
    TDelegate specificHandler = null;
    Action<object, TEventArgs> handler = (sender, e) =>
    {
        removeHandler(specificHandler);
        handler = null;
        tcs.SetResult(e);
        tcs = null;
    };
    var invokeMethodInfo = typeof(Action<object, TEventArgs>).GetMethod("Invoke");
    specificHandler = (TDelegate)invokeMethodInfo
        .CreateDelegate(typeof(TDelegate), handler);
    addHandler(specificHandler);
    return tcs.Task;
}

Usage example:

var documentCompletedAsync = TaskFromEvent<
    WebBrowserDocumentCompletedEventHandler,
    WebBrowserDocumentCompletedEventArgs>(
    handler => webBrowser.DocumentCompleted += handler,
    handler => webBrowser.DocumentCompleted -= handler);
webBrowser.Navigate("about:blank");
var url = (await documentCompletedAsync).Url;
Lueluebke answered 4/11, 2019 at 6:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.