Consider this example:
class Program
{
private static readonly ITargetBlock<string> Mesh = CreateMesh();
private static readonly AsyncLocal<string> AsyncLocalContext
= new AsyncLocal<string>();
static async Task Main(string[] args)
{
var tasks = Enumerable.Range(1, 4)
.Select(ProcessMessage);
await Task.WhenAll(tasks);
Mesh.Complete();
await Mesh.Completion;
Console.WriteLine();
Console.WriteLine("Done");
}
private static async Task ProcessMessage(int number)
{
var param = number.ToString();
using (SetScopedAsyncLocal(param))
{
Console.WriteLine($"Before send {param}");
await Mesh.SendAsync(param);
Console.WriteLine($"After send {param}");
}
}
private static IDisposable SetScopedAsyncLocal(string value)
{
AsyncLocalContext.Value = value;
return new Disposer(() => AsyncLocalContext.Value = null);
}
private static ITargetBlock<string> CreateMesh()
{
var blockOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = DataflowBlockOptions.Unbounded,
EnsureOrdered = false,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
};
var block1 = new TransformBlock<string, string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
return input;
}, blockOptions);
var block2 = new TransformBlock<string, string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
return input;
}, blockOptions);
var block3 = new ActionBlock<string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
}, blockOptions);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
block1.LinkTo(block2, linkOptions);
block2.LinkTo(block3, linkOptions);
return new EncapsulatedActionBlock<string>(block1, block3.Completion);
}
}
internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
private readonly ITargetBlock<T> _wrapped;
public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
{
_wrapped = wrapped;
Completion = completion;
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
_wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
public void Complete() => _wrapped.Complete();
public void Fault(Exception exception) => _wrapped.Fault(exception);
public Task Completion { get; }
}
internal class Disposer : IDisposable
{
private readonly Action _disposeAction;
public Disposer(Action disposeAction)
{
_disposeAction = disposeAction
?? throw new ArgumentNullException(nameof(disposeAction));
}
public void Dispose()
{
_disposeAction();
}
}
The result of the execution will be something like:
Before send 1 After send 1 Before send 2 After send 2 Before send 3 After send 3 Before send 4 After send 4 Block1 [thread 9] Input: 3 - Context: 3. Block1 [thread 10] Input: 2 - Context: 1. Block1 [thread 8] Input: 4 - Context: 4. Block1 [thread 11] Input: 1 - Context: 2. Block2 [thread 9] Input: 2 - Context: 3. Block2 [thread 7] Input: 1 - Context: 2. Block2 [thread 10] Input: 3 - Context: 3. Block2 [thread 8] Input: 4 - Context: 4. Block3 [thread 11] Input: 4 - Context: 4. Block3 [thread 7] Input: 1 - Context: 2. Block3 [thread 9] Input: 3 - Context: 3. Block3 [thread 4] Input: 2 - Context: 3. Done
As you can see the passed context value and stored one's are not always the same after moving to second TDF block. This behavior screws up multiple Logging frameworks' LogContext feature usages.
- Is it an expected behavior (please explain why)?
- Does the TPL Dataflow messes up the execution context somehow?