How to run an async delegate on captured ExecutionContext
Asked Answered
G

2

8

As Stephen Toub explained in this post, when you submit a message to an ActionBlock, you can ExecutionContext.Capture before calling ActionBlock.Post, pass a DTO holding both message and ExecutionContext into the block, then inside the message handling delegate use ExecutionContext.Run to run the delegate on the captured context:

public sealed class ContextFlowProcessor<T> {
    private struct MessageState {
        internal ExecutionContext Context;
        internal T Value;
    }

    private readonly ITargetBlock<MessageState> m_block;

    public ContextFlowProcessor(Action<T> action) {
        m_block = new ActionBlock<MessageState>(ms =>
        {
            if (ms.Context != null)
                using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
            else 
                action(ms.Value);
        });
    }

    public bool Post(T item) {
        var ec = ExecutionContext.Capture();
        var rv = m_block.Post(new MessageState { Context = ec, Value = item });
        if (!rv) ec.Dispose();
        return rv;
    }

    public void Done() { m_block.DeclinePermanently(); }

    public Task CompletionTask { get { return m_block.CompletionTask; } }

This works well when the logic inside the message handler is synchronous. But how can I run a piece of async logic on the captured ExecutionContext? I need something like this:

m_block = new ActionBlock<MessageState>(async ms =>
{
      // omitting the null context situation for brevity
      using (ms.Context)
      {
         await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
      }
});

Obviously, this doesn't compile because ExecutionContext.Run does not support asynchronous delegates (while ActionBlock does) - so how can I do this?

Gytle answered 14/8, 2019 at 19:58 Comment(4)
The link provided refers to a pre-release version of TPL Dataflow, which is not compatible with the current API (for example method DeclinePermanently, property CompletionTask). So the information provided my not be accurate. My experiments indicate that the ExecutionContext is captured by default, and the ContextFlowProcessor behaves the same as a simple ActionBlock. Do you have an example that shows a difference?Shovelboard
Not really, no. I looked at the current source code and it only captures the ExecutionContext (implicitly) when it starts a new task, which happens when you post the first message, or the first after a long gap, but not for subsequent messages queued to the running task-loop.Gytle
As explained here the async/await keywords are using ExecutionContext behind the scenes.The async/await is just some infrastructure that help simulate synchronous semantics in asynchronous programing. so when you are using ExecutionContext it means you gonna handle things manually. and i think it doesn't make sense to ExecutionContext.Run support async/await while they are themselves based on ExecutionContext.Run and they are using itCeram
What about using Task.Run?Shaylynn
A
2

If you can provide a self-contained example so we could try to repro the problem, we might be able to provide a better answer. That said, it's possible to manually control the flow of ExecutionContext (or rather, a copy of it) across await continuations, using a simple custom synchronization context. Here is an example (warning - almost untested!):

// using EcFlowingSynchronizationContext:

m_block = new ActionBlock<MessageState>(async ms =>
{
      using (ms.Context)
      using (var sc = new EcFlowingSynchronizationContext(ms.Context))
      {
         await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
      }
});

// EcFlowingSynchronizationContext: flow execution context manually 

public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly ExecutionContext _ec;
    private readonly TaskScheduler _taskScheduler;

    public EcFlowingSynchronizationContext(ExecutionContext sourceEc) 
    {
        TaskScheduler ts = null;
        ExecutionContext ec = null;

        ExecutionContext.Run(sourceEc, _ =>
        {
            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(this);
            try
            {
                ts = TaskScheduler.FromCurrentSynchronizationContext();
                // this will also capture SynchronizationContext.Current,
                // and it will be flown by subsequent ExecutionContext.Run
                ec = ExecutionContext.Capture();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }, null);

        _ec = ec;
        _taskScheduler = ts;
    }

    private void Execute(SendOrPostCallback d, object state)
    {
        using (var ec = _ec.CreateCopy())
        {
            ExecutionContext.Run(ec, new ContextCallback(d), state);
        }
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Execute(d, state);
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public void Dispose()
    {
        _ec.Dispose();
    }
}

Note, you should only store immutable values using CallContext.LogicalSetData (or AsyncLocal<T>). I.e, if you need to store something that may change during an asynchronous flow from a callee to the caller, and be able to track that change in the caller, make it a property of a class and then store an instance of that class. Make sure that class is thread-safe as well, because eventually you can have many concurrent forks of the original execution contexts.

For more details, refer to Stephen Cleary's excellent Implicit Async Context ("AsyncLocal") and "Eliding Async and Await".

Antipodal answered 26/8, 2019 at 22:39 Comment(10)
so basically, Task.Run with some extra details?Gytle
@zvolkov, no it doesn't use Task.Run at all. I just made EcFlowingSynchronizationContext.Run to look similar for convenience.Antipodal
I mean, your sc.Run uses Task.Factory.StartNew to do the part I was asking about (run the async code), and the way it runs it on the target EC, is by getting a taskscheduler FromCurrentSynchronizationContext from inside my target EC.Run - but I wonder if I could just directly use Task.Factory.StartNew inside my target EC.Run, without creating this extra class?Gytle
@zvolkov what TaskScheduler are you going to provide if you call Task.Factory.StartNew like that? If you give it TaskScheduler.Default, you'll get the default EC propagation behavior, which you were not happy with in the first place. The whole deal with custom sync context and custom task scheduler was to alter how it gets propagated. Does this implementation actually work as you wanted it? Or am I missing something?Antipodal
No, just trying to understand the underlying principle. I will need to test this.Gytle
@zvolkov, actually I believe the default EC propagation behavior should be suitable enough, I don't think it needs customization. Check the last paragraph in my answer, maybe that's where you experience a problem in your code.Antipodal
I don't see how the last paragraph is relevant and I did read both of those posts before. I'm trying to flow the context alongside the message I post to the ActionBlock and as I said, AB only flows EC implicitly, when starting new task for the message loop, which doesn't happen on every message.Gytle
Anyway, I think the key point of your answer is running the task on that special scheduler, which will somehow get the target EC flow to my task, correct?Gytle
@zvolkov correct but there's one thing. If you use a custom task scheduler without a custom synchronization, the await continuations will only be scheduled by your custom scheduler if it's recognized as currently active, "ambient" scheduler, see here. It's very easy for the inner calls to set their own task scheduler, e.g., Task.Run uses TaskScheduler.Default. So you can't rely the scheduler will stay the same inside out.Antipodal
Then again, await task.ConfigureAwait(false) will get you off the current synchronization context, either. So really, there is no guarantee that this EC propagation logic will apply to all await continuations inside your async lambda, unless perhaps you wrap each of them with EcFlowingSynchronizationContext.Run, too.Antipodal
S
2

This seems to work for me. At least the AsyncLocal<> stuff from the original task is available in SomeMethodAsync as expected:

public static class ExecutionContextExtensions
{
    public static async Task<T> RunAsync<T>(this ExecutionContext executionContext, Func<Task<T>> asyncFunc, bool ownContext = false)
    {
        if(executionContext == null)
            return await asyncFunc();

        Task<T> task = null;
        using var ec = ownContext ? executionContext : executionContext.CreateCopy();
        ExecutionContext.Run(ec, _ => task = asyncFunc(), null);
        if (task == null)
            return default;
        return await task;
    }
    
    public static async Task RunAsync(this ExecutionContext executionContext, Func<Task> asyncAction, bool ownContext = false)
    {
        if (executionContext == null)
        {
            await asyncAction();
            return;
        }
        Task task = null;
        using var ec = ownContext ? executionContext : executionContext.CreateCopy();
        ExecutionContext.Run(ec, _ => task = asyncAction(), null);
        if (task == null)
            return;
        await task;
    }
}

Usage:

 var ec = ExecutionContext.Capture();
 var result = await ec.RunAsync(() => SomeMethodAsync());
Sedgewick answered 14/5 at 4:54 Comment(1)
This page is the 1st google result for "executioncontext run async", and this is technically an answer to the OPs question. I'll just remove my "invitation for comments" from the answer. I'm sure people will point out all the problems anyway, if there are any :)Sedgewick

© 2022 - 2024 — McMap. All rights reserved.