TPL Dataflow: process messages from two incoming blocks sequentially
Asked Answered
L

2

1

I'm working with TPL Dataflow now and I need to implement my own action block.

This action block should accept messages from two different input blocks, put these messages into single queue and then process this queue sequentially. The main point here is that two different tasks shouldn't be executed concurrently and I don't want use locks.

Here is my solution but it doesn't work properly.

public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
    where TInputLhs : class
    where TInputRhs : class
{
    public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
    public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }


    private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
    private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();

    private ITargetBlock<object> queue;

    public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
    {
        queue = new ActionBlock<object>(x =>
        {
            if (x is TInputLhs)
            {
                actionLhs(x as TInputLhs);
            }
            else
            {
                actionRhs(x as TInputRhs);
            }
        });

        inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
        inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    public void Complete()
    {
        queue.Complete();
    }

    public Task Completion
    {
        get { return queue.Completion; }
    }

    public void Fault(Exception exception)
    {
        queue.Fault(exception);
    }
}

Simple usage example:

static void Main(string[] args)
{
    var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));

    var batchBlock = new BatchBlock<string>(3);

    var processInOrderBlock = new OrderedActionBlock<string, string[]>(
        new Action<string>((str) =>
        {
            Console.WriteLine(str);
        }),
        new Action<string[]>((batch) =>
        {
            Console.WriteLine("BATCH - " + string.Join(", ", batch));
        }));

    splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
    splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });

    for (int i = 1; i <= 10; i++)
    {
        splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
    }

    splitBlock.Complete();

    processInOrderBlock.Completion.Wait();

    return;
}

The output:

xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .

Looks like messages stuck in batchBlock. And I don't know why.

Loaves answered 20/9, 2016 at 11:9 Comment(9)
Why can't you just use one regular ActionBlock with a parallelism limit of one? You almost have that, just the limit is missing.Splint
@usr, in this case I should pull out code which check the type of message (is it a single message or a batch) into the user code whereas I want to keep such infrastructure code somewhere inside library. Futhermore I dislike ActionBlock which accept "some Object" (i.e. ActionBlock<object>) and prefer static typingLoaves
Can you say more clearly why you can't just set maxparallelism = 1? What does this have to do with batching?Splint
@usr, ActionBlock cannot accept different messages from two different source blocks (Message from one and Message[] from another) and perform different operations on them. I need custom block hereLoaves
In your queue = new ActionBlock... line you are doing just that: Creating an ActionBlock that accepts two different sources of different types. Now configure maxparallelism = 1 for that block.Splint
The OrderedActionBlock class is OK to use as a wrapper but it's not essential to solving the problem. A static helper method that creates the appropriate ActionBlock would do.Splint
What I notice, too, is that completion is propagated two times over the two branches. Maybe that's a problem. I have no experience with that.Splint
@usr, maxparallelism = 1 is a default value, so no need to set it. As for helper method - this is not a good solution in my opinion. This method will return ActionBlock<object> and users of that lib won't know which types actually accepts this ActionBlock. As for Propagate options - maybe you're right, I'm trying to find a correct solutionLoaves
Indeed, I did not know that the default maxp value is 1. How unexpected. So it looks like you want to provide a library and not just solve the concrete dataflow problem at hand. A custom block makes sense now.Splint
L
3

Looks like queue is complete when any of inputLhs or inputRhs is complete (if use PropagateCompletion = true option during linking).

So, we need change this:

inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });

to this:

Task.WhenAll(InputLhs.Completion, InputRhs.Completion)
    .ContinueWith(_ => queue.Complete());
Loaves answered 20/9, 2016 at 13:48 Comment(0)
C
1

You could have a single ActionBlock that accepts a ValueTuple with two values, plus an index to indicate which of the two values is the valid one:

var block = new ActionBlock<(int, Type1, Type2)>(entry =>
{
    var (index, item1, item2) = entry;
    switch (index)
    {
        case 1: DoSomething1(item1); break;
        case 2: DoSomething2(item2); break;
        default: throw new NotImplementedException();
    }
});

block.Post((1, someValue1, default));
block.Post((2, default, someValue2));

This way, by getting rid of the two intermediary BufferBlocks, you could be sure that the order of processing will be exactly the same with the order of posting.

To make it prettier and less error prone you could make a class similar to your OrderedActionBlock, but with "fake" ITargetBlock<TInputLhs> and ITargetBlock<TInputRhs> properties that are not true blocks, but just propagator facades to the single ActionBlock. Converting from one ITargetBlock to another is a bit tricky, but it's doable. Below is a generic implementation. The ActionBlock<TInput1, TInput2> completes when both its Target1 and Target2 are completed, so propagating completion from linked sources should work as expected.

public class ActionBlock<TInput1, TInput2> : IDataflowBlock
{
    private readonly ITargetBlock<(int, TInput1, TInput2)> _actionBlock;

    public Task Completion => _actionBlock.Completion;
    public void Complete() => _actionBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);

    public ITargetBlock<TInput1> Target1 { get; }
    public ITargetBlock<TInput2> Target2 { get; }

    public ActionBlock(Func<TInput1, Task> action1, Func<TInput2, Task> action2,
        ExecutionDataflowBlockOptions options = null)
    {
        if (action1 == null) throw new ArgumentNullException(nameof(action1));
        if (action2 == null) throw new ArgumentNullException(nameof(action2));
        options = options ?? new ExecutionDataflowBlockOptions();

        _actionBlock = new ActionBlock<(int, TInput1, TInput2)>(entry =>
        {
            var (index, item1, item2) = entry;
            return index switch // switch expression (C# 8.0 syntax)
            {
                1 => action1(item1),
                2 => action2(item2),
                _ => throw new NotImplementedException()
            };
        }, options);

        this.Target1 = new TargetConverter<TInput1, (int, TInput1, TInput2)>(
            _actionBlock, x => (1, x, default), () => Complete(1));

        this.Target2 = new TargetConverter<TInput2, (int, TInput1, TInput2)>(
            _actionBlock, x => (2, default, x), () => Complete(2));
    }

    // Constructor with synchronous lambdas
    public ActionBlock(Action<TInput1> action1, Action<TInput2> action2,
        ExecutionDataflowBlockOptions options = null) : this(
            item1 => { action1(item1); return Task.CompletedTask; },
            item2 => { action2(item2); return Task.CompletedTask; }, options) { }

    // Complete when both targets complete
    private readonly bool[] _completeState = new bool[2];
    private void Complete(int index)
    {
        bool completed;
        lock (_completeState.SyncRoot)
        {
            _completeState[index - 1] = true;
            completed = _completeState.All(v => v);
        }
        if (completed) _actionBlock.Complete();
    }
}

// Generic class for converting from one type of ITargetBlock to another
public class TargetConverter<TFrom, TTo> : ITargetBlock<TFrom>
{
    private readonly ITargetBlock<TTo> _parent;
    public readonly Func<TFrom, TTo> _convert;
    public readonly Action _completeAction;

    public TargetConverter(ITargetBlock<TTo> parent, Func<TFrom, TTo> convert,
        Action completeAction = null)
    {
        if (parent == null) throw new ArgumentNullException(nameof(parent));
        if (convert == null) throw new ArgumentNullException(nameof(convert));
        _parent = parent;
        _convert = convert;
        _completeAction = completeAction;
    }

    Task IDataflowBlock.Completion => _parent.Completion;
    void IDataflowBlock.Complete()
    {
        if (_completeAction != null) _completeAction(); else _parent.Complete();
    }
    void IDataflowBlock.Fault(Exception ex) => _parent.Fault(ex);

    DataflowMessageStatus ITargetBlock<TFrom>.OfferMessage(
        DataflowMessageHeader messageHeader, TFrom messageValue,
        ISourceBlock<TFrom> source, bool consumeToAccept)
    {
        return _parent.OfferMessage(messageHeader,
            _convert(messageValue),
            source != null ? new SourceProxy(source, this) : null,
            consumeToAccept);
    }

    // An internal ISourceBlock facade is also needed
    private class SourceProxy : ISourceBlock<TTo>
    {
        private readonly ISourceBlock<TFrom> _source;
        private readonly TargetConverter<TFrom, TTo> _target;

        public SourceProxy(ISourceBlock<TFrom> source,
            TargetConverter<TFrom, TTo> target)
        {
            _source = source;
            _target = target;
        }

        TTo ISourceBlock<TTo>.ConsumeMessage(
            DataflowMessageHeader messageHeader,
            ITargetBlock<TTo> target,
            out bool messageConsumed)
        {
            return _target._convert(_source.ConsumeMessage(messageHeader,
                _target, out messageConsumed));
        }

        bool ISourceBlock<TTo>.ReserveMessage(
            DataflowMessageHeader messageHeader,
            ITargetBlock<TTo> target)
        {
            return _source.ReserveMessage(messageHeader, _target);
        }

        void ISourceBlock<TTo>.ReleaseReservation(
            DataflowMessageHeader messageHeader,
            ITargetBlock<TTo> target)
        {
            _source.ReleaseReservation(messageHeader, _target);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception exception)
            => throw new NotSupportedException();

        IDisposable ISourceBlock<TTo>.LinkTo(
            ITargetBlock<TTo> target,
            DataflowLinkOptions linkOptions) => throw new NotSupportedException();
    }
}
Choriamb answered 26/6, 2020 at 13:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.