How do I create a custom SynchronizationContext so that all continuations can be processed by my own single-threaded event loop?
Asked Answered
M

2

22

Say you're writing a custom single threaded GUI library (or anything with an event loop). From my understanding, if I use async/await, or just regular TPL continuations, they will all be scheduled on TaskScheduler.Current (or on SynchronizationContext.Current).

The problem is that the continuation might want to access the single threaded parts of the library, which means it has to execute in the same event loop. For example, given a simple game loop, the events might be processed like this:

// All continuation calls should be put onto this queue
Queue<Event> events;

// The main thread calls the `Update` method continuously on each "frame"
void Update() {
    // All accumulated events are processed in order and the queue is cleared
    foreach (var event : events) Process(event);

    events.Clear();
}

Now given my assumption is correct and TPL uses the SynchronizationContext.Current, any code in the application should be able to do something like this:

async void Foo() {
    someLabel.Text = "Processing";

    await BackgroundTask();

    // This has to execute on the main thread
    someLabel.Text = "Done";
}

Which brings me to the question. How do I implement a custom SynchronizationContext that would allow me to handle continuations on my own thread? Is this even the correct approach?

Minion answered 1/9, 2016 at 12:23 Comment(0)
M
19

Implementing a custom SynchronizationContext is not the easiest thing in the world. I have an open-source single-threaded implementation here that you can use as a starting point (or possibly just use in place of your main loop).

By default, AsyncContext.Run takes a single delegate to execute and returns when it is fully complete (since AsyncContext uses a custom SynchronizationContext, it is able to wait for async void methods as well as regular async/sync code).

AsyncContext.Run(async () => await DoSomethingAsync());

If you want more flexibility, you can use the AsyncContext advanced members (these do not show up in IntelliSense but they are there) to keep the context alive until some external signal (like "exit frame"):

using (var context = new AsyncContext())
{
  // Ensure the context doesn't exit until we say so.
  context.SynchronizationContext.OperationStarted();

  // TODO: set up the "exit frame" signal to call `context.SynchronizationContext.OperationCompleted()`
  // (note that from within the context, you can alternatively call `SynchronizationContext.Current.OperationCompleted()`

  // Optional: queue any work you want using `context.Factory`.

  // Run the context; this only returns after all work queued to this context has completed and the "exit frame" signal is triggered.
  context.Execute();
}

AsyncContext's Run and Execute replace the current SynchronizationContext while they are running, but they save the original context and set that as current before returning. This allows them to work nicely in a nested fashion (e.g., "frames").

(I'm assuming by "frame" you mean a kind of WPF-like dispatcher frame).

Mccauley answered 1/9, 2016 at 13:24 Comment(5)
Looks like you might be solving a more complicated problem than mine. I'm not doing any nesting, by frame I meant just a single call to render, rendering a single frame on a canvas. I wouldn't have any other synchronization context, just the one for the whole program. Which I guess should simplify things a bit?Minion
@JakubArnold: Yes, you can just ignore the nesting part then. You'd still need to use OperationStarted to keep the main loop going until you get an "exit program" request.Mccauley
But that's assuming the SynchronizationContext runs the loop, right? If I wanted to manually call onto the context to process its queue, wouldn't it be enough to override just Post to store into a queue?Minion
SynchronizationContext never runs a loop;. If you want to build your own, you should override both Post and Send.Mccauley
It seems like your Github project moved to github.com/StephenCleary/AsyncExEnchase
G
1

Although this question asks about a custom SynchronizationContext, the same functionality can be accomplished with a custom TaskScheduler. The await keyword captures either the SynchronizationContext.Current or the TaskScheduler.Current, and the continuation after the await will be send to the captured scheduler. The SynchronizationContext has a behavioral advantage over the TaskScheduler that I'll explain later, but a custom TaskScheduler can be implemented more easily. So let's see such an implementation:

public class SyncronizationTaskScheduler : TaskScheduler, IDisposable
{
    private readonly BlockingCollection<Task> _queue = new();
    private int _pendingCount = 1; // Represents the completion of masterTask

    public static void Run(Func<Task> action)
    {
        ArgumentNullException.ThrowIfNull(action);
        using SyncronizationTaskScheduler scheduler = new();

        Task<Task> masterTaskTask = new(action, TaskCreationOptions.DenyChildAttach);
        Task masterTask = masterTaskTask.Unwrap();
        masterTaskTask.Start(scheduler);

        // The masterTask cannot be completed at this point.
        scheduler.HandleTaskCompletion(masterTask);

        foreach (Task task in scheduler._queue.GetConsumingEnumerable())
            scheduler.TryExecuteTask(task);

        masterTask.Wait(); // Propagate all exceptions
    }

    private SyncronizationTaskScheduler() { } // Prevent public instantiation

    private void HandleTaskCompletion(Task task)
    {
        _ = task.ContinueWith(t => ErrorOnThreadPool(() =>
        {
            int pending = Interlocked.Decrement(ref _pendingCount);
            Debug.Assert(pending >= 0);
            if (pending == 0) _queue.CompleteAdding();
        }), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default);
    }

    private void ErrorOnThreadPool(Action action)
    {
        try { action(); }
        catch (Exception ex)
        {
            ThreadPool.QueueUserWorkItem(_ => throw new TaskSchedulerException(ex));
        }
    }

    protected override void QueueTask(Task task)
    {
        ErrorOnThreadPool(() =>
        {
            int pending = Interlocked.Increment(ref _pendingCount);
            Debug.Assert(pending > 0);
            HandleTaskCompletion(task);
            _queue.Add(task);
        });
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued) => false;

    public override int MaximumConcurrencyLevel => 1;
    protected override IEnumerable<Task> GetScheduledTasks() => _queue;

    public void Dispose() => _queue.Dispose();
}

Usage example:

SyncronizationTaskScheduler.Run(async () =>
{
    await Task.Delay(500);
    Console.WriteLine($"After await on thread #{Thread.CurrentThread.ManagedThreadId}");
    await Task.Delay(500);
    Console.WriteLine($"After await on thread #{Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine("SyncronizationTaskScheduler.Run completed");

Output:

After await on thread #1
After await on thread #1
SyncronizationTaskScheduler.Run completed

Online demo

The SyncronizationTaskScheduler.Run is a blocking call. It blocks the current thread until the completion of the asynchronous action delegate. The reason that the current thread must be blocked is because we need this thread to be available to run all the await continuations that have captured our custom scheduler. As you can see from the output of the program, both await continuations have ran on the thread #1, which is the main thread of the console application. The main thread was temporarily unblocked to run each continuation, and then blocked again until all the synchronous and asynchronous work initiated by the action was completed.

Concurrent operations are supported. You can launch many tasks inside the action, and all the await continuations of all the tasks will run on the current thread. Make sure to await all these tasks before exiting the action though.

The custom TaskScheduler above holds the scheduled tasks in a BlockingCollection<Task> queue, much like Stephen Cleary's custom SynchronizationContext implementation (AsyncContext).

One interesting point of the above implementation is the ErrorOnThreadPool method. The QueueTask method of the TaskScheduler has the habit of suppressing any exceptions that are thrown inside the body of the method, in which case the scheduler will just stop working and the SyncronizationTaskScheduler.Run would never return. The solution is to funnel any exception to the ThreadPool, so that the program can terminate with an unhandled exception (can be observed with the AppDomain.CurrentDomain.UnhandledException event). Such an exception can only be raised when the action creates tasks and loses track of them, letting them become fire-and-forget. When these tasks try to schedule an await continuation after the completion of the SyncronizationTaskScheduler.Run, they'll find that the BlockingCollection<Task> has been disposed, resulting to an ObjectDisposedException. This is an incorrect usage of the scheduler. All tasks created by the action are expected to be awaited by the action itself, otherwise an unhandled ObjectDisposedException will be raised, and will crash the process. Which is bad, but arguably better than letting the process hang indefinitely without any error message whatsoever.

The behavioral disadvantage that the SyncronizationTaskScheduler has compared to Stephen Cleary's AsyncContext, is that it can't track the completion of any async void operations launched by the action. The SynchronizationContext has an OperationCompleted method that is invoked appropriately by .NET's async infrastructure, and the TaskScheduler has nothing like this. So if you are dealing with async void operations, your only option is use a custom SynchronizationContext implementation.

Also be aware that any await configured with ConfigureAwait(false) inside the action will not capture the scheduler, and the continuation will run on some other thread instead of the current thread. So use ConfigureAwait(false) only when you know for sure that there is no thread-affine code after the await.

Globuliferous answered 18/3 at 9:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.