How can a TPL Dataflow block downstream get data produced by a source?
Asked Answered
V

2

8

I'm processing images using TPL Dataflow. I receive a processing request, read an image from a stream, apply several transformations, then write the resulting image to another stream:

Request -> Stream -> Image -> Image ... -> Stream

For that I use the blocks:

BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>

The problem is the initial Request is what contains some data necessary to create the resulting Stream along with some additional info I need at that point. Do I have to pass the original Request (or some other context object) down the line to the writerBlock across all the other blocks like this:

TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...

(which is ugly), or is there a way to link the first block to the last one (or, generalizing, to the ones that need the additional data)?

Vie answered 30/1, 2013 at 19:51 Comment(0)
H
8

Yes, you pretty much need to do what you described, passing the additional data from every block to the next one.

But using a couple of helper methods, you can make this much simpler:

public static IPropagatorBlock<TInput, Tuple<TOutput, TInput>>
    CreateExtendedSource<TInput, TOutput>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<TInput, Tuple<TOutput, TInput>>(
        input => Tuple.Create(transform(input), input));
}

public static IPropagatorBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>
    CreateExtendedTransform<TInput, TOutput, TExtension>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>(
        tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}

The signatures look daunting, but they are actually not that bad.

Also, you might want to add overloads that pass options to the created block, or overloads that take async delegates.

For example, if you wanted to perform some operations on a number using separate blocks, while passing the original number along the way, you could do something like:

var source = new BufferBlock<int>();
var divided = CreateExtendedSource<int, double>(i => i / 2.0);
var formatted = CreateExtendedTransform<double, string, int>(d => d.ToString("0.0"));
var writer = new ActionBlock<Tuple<string, int>>(tuple => Console.WriteLine(tuple));

source.LinkTo(divided);
divided.LinkTo(formatted);
formatted.LinkTo(writer);

for (int i = 0; i < 10; i++)
    source.Post(i);

As you can see, your lambdas (except for the last one) deal only with the “current” value (int, double or string, depending on the stage of the pipeline), the “original” value (always int) is passed automatically. At any moment, you can use block created using the normal constructor to access both values (like the final ActionBlock in the example).

(That BufferBlock isn't actually necessary, but I added it to more closely match your design.)

Hackamore answered 30/1, 2013 at 20:20 Comment(2)
That's a nice approach. I've read some of your answers on TDF, and you clearly know the subject. Are you saying then there's no way to send the context data across the dataflow network bypassing the intermediate blocks?Vie
Well, if you're sure that the order of items will be maintained across the whole pipeline (TransformBlockdoes maintain order), then you could use something like a global queue of the additional data. The first block would add it there and the last block would read it from there. But I think doing something like that will be fragile. And it would mean each input would have to produce exactly one output across the whole pipeline.Hackamore
P
1

I may be going over my head since I am only starting to play with TPL Dataflow. But I believe you can accomplish that using a BroadcastBlock as an intermediary between your source and your first target.

BroadcastBlock can offer the message to many targets, so you use it to offer to your target, and also to a JoinBlock, at the end that will merge the result with the original message.

source -> Broadcast ->-----------------------------------------> JoinBlock <source, result>
                    -> Transformation1 -> Transformation 'n'  ->

For example:

var source = new BufferBlock<int>();
var transformation =  new TransformBlock<int, int>(i => i * 100);

var broadCast = new BroadcastBlock<int>(null);
source.LinkTo(broadCast);
broadCast.LinkTo(transformation);

var jb = new JoinBlock<int, int>();
broadCast.LinkTo(jb.Target1);
transformation.LinkTo(jb.Target2);

jb.LinkTo(new ActionBlock<Tuple<int, int>>(
          c => Console.WriteLine("Source:{0}, Target Result: {1}", c.Item1, c.Item2)));

source.Post(1);
source.Post(2);

source.Complete();

yields...

Source:1, Target Result: 100

Source:2, Target Result: 200

I am just not too sure about how it would behave in an asynchronous environment.

Precept answered 17/7, 2015 at 18:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.