Use TPL Dataflow to encapsulate pipeline ending in an action block
Asked Answered
B

2

8

TPL Dataflow provides a very useful function:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)

to enable you to, well, encapsulate multiple blocks into a single transform block. It returns a

IPropagatorBlock<TInput, TOutput>

which represents the start and end blocks of your pipeline.

However, if the last block in my pipeline is an ActionBlock, I can't use this, since an ActionBlock is not a SourceBlock, and the return type of the function would be an ITargetBlock, not an IPropagatorBlock.

Essentially, what I'm looking for is something like this function:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)

Is this a sensible thing to write, or am I missing something simple? I'm not quite sure how to write it - particularly wiring up the completion. Would I need to create my own custom block type?

EDIT:

Ok, so having read the response from @Panagiotis Kanavos, and done some tinkering, I've come up with this. This is based on the EncapsulatingPropagator class, which is what the existing DataflowBlock.Encapsulate method uses:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            this.startBlock.Fault(exception);
        }

        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }
Blenheim answered 8/9, 2015 at 14:2 Comment(2)
And I just finished posting a similar update! Nice job on making this generic - this allows you to separate building the module/fragment from the boilerplateEmeliaemelin
Being able to treat a "terminating" pipeline's initial target and completions as a single ActionBlock-like entity seems so obvious a need that the gap in API almost feels... purposeful. But I'll whistle past the graveyard and use your work here. Thanks.Mediate
E
4

Encapsulate isn't used to abstract an existing pipeline, it's used to create a propagator block that requires custom behaviour that can't be implemented using the existing blocks and links.

For example, the Sliding Window sample buffers all incoming messages posted to its input block and outputs a batch of all retrieved messages when a sliding window expires to its output block.

The names of the method create a lot of confusion but they do make sense when you understand their purpose:

  • The target argument is the target (input) endpoint to which preceding blocks will connect to send messages. In this case, an ActionBlock that processes incoming messages and decides whether or not to post to the output (source) block makes sense.
  • The source argument is the source (output) endpoint to which succeeding steps will connect to receive messages. It doesn't make sense to use an ActionBlock as a source because it doesn't have any output.

An Encapsulate variant that accepts an ActionBlock method as source isn't useful because you can simply link from any previous step to an action block.

EDIT

If you want to modularize a pipeline, ie break it into reuseable, more manageable you can create a class that construct, you can use a plain old class. In that class, you build your pipeline fragment as normal, link the blocks (ensuring completion is propagated) and then expose the first step and the Completion task of the last step as public properties, eg:

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}

    public Task Completion {get;}

    ActionBlock<SomeOther> _finalBlock;

    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }

    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }

    private void MyMethod(SomeOther msg)
    {
    ...
    }
}

To connect the fragment to a pipeline, you only need to link from a pipeline block to the exposed Input block. To await completion, just await on the exposed Completion task.

You could stop here if you want, or you can implement ITargetBlock to make the fragment look like a Target block. You just need to delegate all methods to the Input block and the Completion property to the final block.

Eg:

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....

    public Task Completion {get;}

    public void Complete()
    {
        Input.Complete()
    };

    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }

    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}

EDIT 2

Using @bornfromanegg 's class one can separate the act of building the fragment from the boilerplate that exposes the Input and Completion:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);

    return new EncapsulatingTarget(input,finalBlock);
}
Emeliaemelin answered 8/9, 2015 at 14:28 Comment(5)
I agree up to a point. In fact, not only does it not make sense to use ActionBlock as a source - it's not possible to use it as a source because it does not implement ISourceBlock. But I still have a problem to solve: I have a pipeline where the last block is an action block, and I want to encapsulate it.Blenheim
The Encapsulate method has a very specific (and limited) purpose - it's not a generic encapsulation mechanism. It is useful when you need to include a complex graph or code with well defined input and output endpoints in a pipeline. It's a shortcut to implementing your own block class. In your case, you only need a link from the rest of the pipeline to your first block and awaiting on the Completion task of the last block - just as you would with any pipeline. You can even include the blocks in a class, exposing the input block and the ActionBlock's Completion tasks as properties.Emeliaemelin
I think what you actually mean here is to modularize, not encapsulate a pipeline.Emeliaemelin
Yes, that is perhaps what I mean. I'll edit the question.Blenheim
Splendid - that's exactly what I was looking for. Thank you.Blenheim
M
2

In my case, I wanted to encapsulate a network that comprised multiple final ActionBlocks, with a summarized completion, so the solution outlined in the edited question didn't work.

Because the only interaction with the "final block" surrounds completion, it suffices to present just the completion task for the encapsulation. (Added target-action constructor per suggestion.)

public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
    private readonly ITargetBlock<TInput> startBlock;

    private readonly Task completion;

    public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
    {
        this.startBlock = startBlock;
        this.completion = completion;
    }

    public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
    {
        this.startBlock = startBlock;
        completion = endBlock.Completion;
    }

    public Task Completion => completion;

    public void Complete()
    {
        startBlock.Complete();
    }

    void IDataflowBlock.Fault(Exception exception)
    {
        if (exception == null)
        {
            throw new ArgumentNullException("exception");
        }

        startBlock.Fault(exception);
    }

    public DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        TInput messageValue,
        ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
}

An example usage:

public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
    var splitter = new BroadcastBlock<Client.InputRecord>(null);
    var getresults = new TransformManyBlock(...);    // propagator
    var saveinput = new ActionBlock(...);
    var saveresults = new ActionBlock(...);

    splitter.LinkTo(saveinput, PropagateCompletion);
    splitter.LinkTo(getresults, PropagateCompletion);
    getresults.LinkTo(saveresults, PropagateCompletion);

    return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}

I could have made the signature EncapsulatingTarget<T>(ITargetBlock<T> target, params Task[] completions) and moved the WhenAll(...) inside the constructor, but didn't want to make assumptions about desired completion notification.

Mediate answered 24/1, 2018 at 22:10 Comment(1)
This looks like a useful addition. In fact, I see no reason why you couldn't have both those constructors, as well as the original which takes an ActionBlock. That way you give the caller the choice as to which to use.Blenheim

© 2022 - 2024 — McMap. All rights reserved.