TPL Dataflow Speedup?
Asked Answered
S

4

5

I wonder whether the following code can be optimized to execute faster. I currently seem to max out at around 1.4 million simple messages per second on a pretty simple data flow structure. I am aware that this sample process passes/transforms messages synchronously, however, I currently test TPL Dataflow as a possible replacement for my own custom solution based on Tasks and concurrent collections. I know the terms "concurrent" already suggest I run things in parallel but for current testing purposes I pushed messages on my own solution through synchronously and I get to about 5.1 million messages per second. What am I missing here, I read TPL Dataflow was pushed as a high throughput, low latency solution but so far I must be overlooking performance tweaks. Anyone who could point me into the right direction please?

class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete();

        action.Completion.ContinueWith(t =>
            {
                watch.Stop();

                Console.WriteLine("All Blocks finished processing");
                Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
            });

        Console.ReadLine();
    }
}
Smilax answered 22/6, 2012 at 11:12 Comment(0)
W
9

I think this mostly comes down to one thing: your test is pretty much meaningless. All those blocks are supposed to do something, and use multiple cores and asynchronous operations to do that.

Also, in your test, it's likely that a lot of time is spent on synchronization. With a more realistic code, the code will take some time to execute, so there will be less contention, so the actual overhead will be smaller than what you measured.

But to actually answer your question, yes, you're overlooking some performance tweaks. Specifically, SingleProducerConstrained, which means data structures with less locking can be used. If I use this on both blocks (the BufferBlock is completely useless here, you can safely remove it), the rate raises from about 3–4 millions of items per second to more than 5 millions on my computer.

Warton answered 22/6, 2012 at 13:7 Comment(5)
That's the number I got on my computer when running your code. And what I was trying to say is that you can't compare performance like this, it's not a good representation of real performance.Warton
I take your criticism thanks for that, can I revise some code, run more tests and post an update. Would you mind taking another look afterwards? Thanks a lot.Smilax
Sure, I'll have a look when you post it.Warton
I marked your solution as the desired one. I played a lot with IPropagatorBlock (remember, you proposed a solution in a TPL Dataflow related question I asked couple days ago) and running actions in Transform block in parallel (as intended in most scenarios I guess), setting SingleProducerConstraint to true (applicable in my project), propagating completion in all LinkTo(...), and throwing actual work at each data block shows a huge outperformance over my current framework.Smilax
...The current framework was able to process about 3.2 million items (at full workload) while TPL Dataflow (using your IPropagatorBlock solution) sped things up to >5 million items per second. Awesome. Wet my appetite to delve further into the inner workings of TPL Dataflow.Smilax
C
2

To add to svick's answer, the test uses only a single processing thread for a single action block. This way it tests nothing more than the overhead of using the blocks.

DataFlow works in a manner similar to F# Agents, Scala actors and MPI implementations. Each action block executes a single task at a time, listening to input and producing output. Speedup is provided by breaking an algorithm in steps that can be executed independently on multiple cores, passing only messages to each other.

While you can increase the number of concurrent tasks, the most important issue is designing a flow that perform the maximum amount of steps independently of the others.

Chacha answered 22/6, 2012 at 14:21 Comment(1)
Thanks for the comments but I think you wanted to say datablock not action block, correct? Action Blocks do not produce output.Smilax
F
0

You can also increase the degrees of parallelism for dataflow blocks. This may offer an additional speedup and can also help with load balancing between linear tasks if you find one of your blocks acts as a bottleneck to the rest.

Freebooter answered 3/12, 2013 at 21:17 Comment(0)
F
0

If your workload is so granular that you expect to process millions of messages per second, then passing individual messages through the pipeline becomes not viable because of the associated overhead. You'll need to chunkify the workload by batching the messages to arrays or lists. For example:

var transform = new TransformBlock<int[], string[]>(batch =>
{
    var results = new string[batch.Length];
    for (int i = 0; i < batch.Length; i++)
    {
        results[i] = ProcessItem(batch[i]);
    }
    return results;
});

For batching your input you could use a BatchBlock, or the "linqy" Buffer extension method from the System.Interactive package, or the similar in functionality Batch method from the MoreLinq package, or do it manually.

Fulmar answered 11/6, 2020 at 11:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.