AsyncLocal values not correct with TPL Dataflow
Asked Answered
T

1

6

Consider this example:

class Program

{
    private static readonly ITargetBlock<string> Mesh = CreateMesh();
    private static readonly AsyncLocal<string> AsyncLocalContext
        = new AsyncLocal<string>();

    static async Task Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 4)
            .Select(ProcessMessage);
        await Task.WhenAll(tasks);

        Mesh.Complete();
        await Mesh.Completion;

        Console.WriteLine();
        Console.WriteLine("Done");
    }

    private static async Task ProcessMessage(int number)
    {
        var param = number.ToString();
        using (SetScopedAsyncLocal(param))
        {
            Console.WriteLine($"Before send {param}");
            await Mesh.SendAsync(param);
            Console.WriteLine($"After send {param}");
        }
    }

    private static IDisposable SetScopedAsyncLocal(string value)
    {
        AsyncLocalContext.Value = value;

        return new Disposer(() => AsyncLocalContext.Value = null);
    }

    private static ITargetBlock<string> CreateMesh()
    {
        var blockOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = DataflowBlockOptions.Unbounded,
            EnsureOrdered = false,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        };

        var block1 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block2 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block3 = new ActionBlock<string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");
        }, blockOptions);

        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

        block1.LinkTo(block2, linkOptions);
        block2.LinkTo(block3, linkOptions);

        return new EncapsulatedActionBlock<string>(block1, block3.Completion);
    }
}

internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
    private readonly ITargetBlock<T> _wrapped;

    public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
    {
        _wrapped = wrapped;
        Completion = completion;
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
        _wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);

    public void Complete() => _wrapped.Complete();

    public void Fault(Exception exception) => _wrapped.Fault(exception);

    public Task Completion { get; }
}

internal class Disposer : IDisposable
{
    private readonly Action _disposeAction;

    public Disposer(Action disposeAction)
    {
        _disposeAction = disposeAction
            ?? throw new ArgumentNullException(nameof(disposeAction));
    }

    public void Dispose()
    {
        _disposeAction();
    }
}

The result of the execution will be something like:

Before send 1
After send 1
Before send 2
After send 2
Before send 3
After send 3
Before send 4
After send 4
   Block1 [thread 9] Input: 3 - Context: 3.
   Block1 [thread 10] Input: 2 - Context: 1.
   Block1 [thread 8] Input: 4 - Context: 4.
   Block1 [thread 11] Input: 1 - Context: 2.
   Block2 [thread 9] Input: 2 - Context: 3.
   Block2 [thread 7] Input: 1 - Context: 2.
   Block2 [thread 10] Input: 3 - Context: 3.
   Block2 [thread 8] Input: 4 - Context: 4.
   Block3 [thread 11] Input: 4 - Context: 4.
   Block3 [thread 7] Input: 1 - Context: 2.
   Block3 [thread 9] Input: 3 - Context: 3.
   Block3 [thread 4] Input: 2 - Context: 3.

Done

As you can see the passed context value and stored one's are not always the same after moving to second TDF block. This behavior screws up multiple Logging frameworks' LogContext feature usages.

  1. Is it an expected behavior (please explain why)?
  2. Does the TPL Dataflow messes up the execution context somehow?
Tetrabranchiate answered 1/10, 2019 at 6:44 Comment(0)
C
4

To understand what's going on you must understand how Dataflow blocks work. There are no blocked threads inside them, waiting for messages to arrive. The processing is done by worker tasks. Lets consider the simple (and default) case of MaxDegreeOfParallelism = 1. Initially there are zero worker tasks. When a message is posted asynchronously with SendAsync, the same task that posted the message becomes a worker task and starts processing the message. If another message is posted while the first is processed, something else will happen. The second message will be enqueued in the block's input queue, and the task that posted it will complete. The second message will be processed by the worker-task that processed the first message. As long as there are messages enqueued in the queue, the initial worker task will pick them and process them one by one. If at some moment there are no more buffered messages, the worker task will complete, and the block will return in it's initial state (zero worker tasks). The next SendAsync will become the new worker task, and so on. With MaxDegreeOfParallelism = 1, only one worker task can exist at any given moment.

Lets demonstrate this with an example. Below is an ActionBlock that is feeded with delay X, and processes each message with delay Y.

private static void ActionBlockTest(int sendDelay, int processDelay)
{
    Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
    var asyncLocal = new AsyncLocal<int>();
    var actionBlock = new ActionBlock<int>(async i =>
    {
        await Task.Delay(processDelay);
        Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
    });
    Task.Run(async () =>
    {
        foreach (var i in Enumerable.Range(1, 5))
        {
            asyncLocal.Value = i;
            await actionBlock.SendAsync(i);
            await Task.Delay(sendDelay);
        }
    }).Wait();
    actionBlock.Complete();
    actionBlock.Completion.Wait();
}

Lets see what happens if we send the messages fast and process them slowly:

ActionBlockTest(100, 200); // .NET Core 3.0

SendDelay: 100, ProcessDelay: 200
Processed 1, Context: 1
Processed 2, Context: 1
Processed 3, Context: 1
Processed 4, Context: 1
Processed 5, Context: 1

The AsyncLocal context remained the same, because the same worker task processed all the messages.

Now lets send the messages slowly and process them fast:

ActionBlockTest(200, 100); // .NET Core 3.0

SendDelay: 200, ProcessDelay: 100
Processed 1, Context: 1
Processed 2, Context: 2
Processed 3, Context: 3
Processed 4, Context: 4
Processed 5, Context: 5

The AsyncLocal context is different for each message, because each message was processed by a different worker task.

The moral lesson of this story is that each SendAsync does not guarantee the creation of a single asynchronous workflow that follows the message until the end of its journey, to the end of the pipeline. So the AsyncLocal class cannot be used to hold ambient data for each message.

Coitus answered 1/10, 2019 at 13:1 Comment(2)
Hey @TheodorZoulias, thanks for your answer. It makes sense. Btw, found a good discussion on TDF CTP and ExecutionContext (kind of old, but still might be useful): Best way to pass CallContext in dataflow blocksTetrabranchiate
Yeap, that discussion is very relevant!Coitus

© 2022 - 2024 — McMap. All rights reserved.