Is this a job for TPL Dataflow?
Asked Answered
U

2

12

I run a pretty typical producer/consumer model on different tasks.

Task1: Reads batches of byte[] from binary files and kicks off a new task for each collection of byte arrays. (the operation is batched for memory management purposes).

Task 2-n: Those are worker tasks and each operates on the passed-in collection (from Tasks1) of byte arrays and de-serializes byte arrays, sorts them by certain criteria, and then stores a collection of resulting objects (each byte array deserializes into such object) in a Concurrent Dictionary.

Task (n+1) I chose a concurrent dictionary because the job of this task is to merge those collections that are stored in the concurrent dictionary in the same order than how they originated from Task1. I achieve that by passing a collectionID (it is of type int and incremented for each new collection within Task1) all the way down from Task1 to this task. This task basically checks whether the next expected collectionID is already stored in the concurrent dictionary and if yes, takes it out, adds it to a Final Queue and checks for the next collection in the concurrent dictionary.

Now, from what I have read and the videos I watched it seems to me that TPL Dataflow may be the perfect candidate for such producer/consumer model. I just do not seem to be able to devise a design and thus get started because I have never worked with TPL Dataflow. In terms of throughput and latency is this library even up to the task? I currently process 2.5 million byte arrays and thus objects per second in the resulting collections. Can TPL Dataflow help to simplify? I am especially interested in the answer to the following question: Can TPL Dataflow preserve the order of collection batches from Task1 when spawning off worker tasks and re-merging them once the worker tasks have done their work? Does it optimize things? Having profiled the whole structure I feel there is quite some time wasted due to spinning and too many concurrent collections involved.

Any ideas, thoughts?

Unwinking answered 15/6, 2012 at 14:46 Comment(0)
R
12

EDIT: Turns out I was very wrong. TransformBlock does return items in the same order they came in, even if it is configured for parallelism. Because of that, the code in my original answer is completely useless and normal TransformBlock can be used instead.


Original answer:

As far as I know only one parallelism construct in .Net supports returning processed items in the order they came in: PLINQ with AsOrdered(). But it seems to me that PLINQ doesn't fit what you want well.

TPL Dataflow, on the other hand, fits well, I think, but it doesn't have a block that would support parallelism and returning items in order at the same time (TransformBlock supports both of them, but not at the same time). Fortunately, Dataflow blocks were designed with composability in mind, so we can build our own block that does that.

But first, we have to figure out how to order the results. Using a concurrent dictionary, like you suggested, along with some synchronization mechanism, would certainly work. But I think there is a simpler solution: use a queue of Tasks. In the output task, you dequeue a Task, wait for it to complete (asynchronously) and when it does, you send its result along. We still need some synchronization for the case when the queue is empty, but we can get that for free if we choose which queue to use cleverly.

So, the general idea is like this: what we're writing will be an IPropagatorBlock, with some input and some output. The easiest way to create a custom IPropagatorBlock is to create one block that processes the input, another block that produces the results and treat them as one using DataflowBlock.Encapsulate().

The input block will have to process the incoming items in the correct order, so no parallelization there. It will create a new Task (actually, a TaskCompletionSource, so that we can set the result of the Task later), add it to the queue and then send the item for processing, along with some way to set the result of the correct Task. Because we don't need to link this block to anything, we can use an ActionBlock.

The output block will have to take Tasks from the queue, asynchronously wait for them, and then send them along. But since all blocks have a queue embedded in them, and blocks that take delegates have asynchronous waiting built-in, this will be very simple: new TransformBlock<Task<TOutput>, TOutput>(t => t). This block will work both as the queue and as the output block. Because of this, we don't have to deal with any synchronization.

The last piece of the puzzle is actually processing the items in parallel. For this, we can use another ActionBlock, this time with MaxDegreeOfParallelism set. It will take the input, process it, and set the result of the correct Task in the queue.

Put together, it could look like this:

public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

After so much talk, that's quite a small amount of code, I think.

It seems you care about performance a lot, so you might need to fine tune this code. For example, it might make sense to set MaxDegreeOfParallelism of the processor block to something like Environment.ProcessorCount, to avoid oversubscription. Also, if latency is more important than throughput to you, it might make sense to set MaxMessagesPerTask of the same block to 1 (or another small number) so that when processing of an item is finished, it's sent to the output immediately.

Also, if you want to throttle incoming items, you could set BoundedCapacity of enqueuer.

Rustle answered 15/6, 2012 at 17:46 Comment(8)
Wow quite a bunch of goodies which I would first like to digest and try out. Thanks a lot for those, it at the very least deserved an upvote ;-) Let me play with those ideas and I get back. Queuing Tasks makes a lot of sense and I wonder why I did not get that earlier.Unwinking
ok I spend some time going through your post and reading up on TPL Dataflow, here couple questions to fully understand your proposed solution: (1) why do you suggest a custom IPropagatorBlock and IDataflowBlock.Encapsulate() given a Transformblock<Tin, Tout> already exists? (2) I fail to see how you actually plan to link up the blocks. You talk first of ActionBlocks then of TransformBlocks. From what I read would the ActionBlock not be the "end point" of the whole architecture?Unwinking
1. That's explained in the second paragraph: TransformBlock isn't able to process the items in parallel and return them in order at the same time. It can do either one of them, but not both.Rustle
2. That's all in the code. I didn't actually use LinkTo() here, but instead use SendAsync() in enqueuer twice. I could make enqueuer into a TransformBlock and use one LinkTo() and one SendAsync(), but I liked it this way more, because it's “symmetric”. And ActionBlock usually is an end point, but the code inside it can do anything you want, including sending items to other blocks, which is what I do.Rustle
And I talk first about an ActionBlock (enqueuer), then about different block that is a TransformBlock (queue), and finally about yet another block that is an ActionBlock (processor).Rustle
maybe this is the dumbest of my questions here but how do you implement above code? I assume I need to implement the whole interface to actually test? I want to run it because I think I can understand it better when stepping through each line of code in debug mode.Unwinking
No, you don't need to implement any interface. Just put the method in your code and call it (passing in the function that will execute in parallel), it will return a new block that does what you asked for. To test it out, you can link it to another ActionBlock and then use Post() or SendAsync() to add items for processing.Rustle
I marked your answer as desired solution, awesome. Admittedly, it took quite a bit time to fully understand, play with it, and implement and performance test but I am finally there and your TPL Dataflow solution vastly outperforms what I previous had implemented using concurrent collections and lots of boiler plate code. Awesome, really motivates me to dig deeper in TPL Dataflow.Unwinking
E
0

Yes, the TPL Dataflow library is well suited for this job. It supports all the features that you need: MaxDegreeOfParallelism, BoundedCapacity and EnsureOrdered. But using the BoundedCapacity option requires some attention to details.

At first you must make sure that you feed the first block in the pipeline with the SendAsync method. Otherwise, if you use the Post method and ignore its return value, you may lose messages. The SendAsync will never lose messages, because it blocks asynchronously the caller until there is free space for the incoming message in the block's internal buffer.

Secondly you must ensure that a possible exception in a block downstream will not block indefinitely the feeder, awaiting for free space that will never come. There is no built-in way to make this happen automatically by configuring the blocks. Instead you must propagate manually the completion of the downstream blocks to the blocks upstream. This is the intention of the method PropagateFailure in the example below:

public static async Task ProcessAsync(string[] filePaths,
    ConcurrentQueue<MyClass> finalQueue)
{
    var reader = new TransformBlock<string, byte[]>(filePath =>
    {
        byte[] result = ReadBinaryFile(filePath);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // This is the default
        BoundedCapacity = 20, // keep memory usage under control
        EnsureOrdered = true // This is also the default
    });

    var deserializer = new TransformBlock<byte[], MyClass>(bytes =>
    {
        MyClass result = Deserialize(bytes);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        BoundedCapacity = 20
    });

    var writer = new ActionBlock<MyClass>(obj =>
    {
        finalQueue.Enqueue(obj);
    });

    reader.LinkTo(deserializer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(deserializer, reader); // Link backwards

    deserializer.LinkTo(writer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(writer, deserializer); // Link backwards

    foreach (var filePath in filePaths)
    {
        var accepted = await reader.SendAsync(filePath).ConfigureAwait(false);
        if (!accepted) break; // This will happen in case that the block has failed
    }
    reader.Complete(); // This will be ignored if the block has already failed

    await writer.Completion; // This will propagate the first exception that occurred
}

public static async void PropagateFailure(IDataflowBlock block1,
    IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); }
    catch (Exception ex) { block2.Fault(ex); }
}
Elaterid answered 7/6, 2020 at 16:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.