How to properly manage Completion in TPL Dataflow
Asked Answered
S

1

8

I've created something similar to a web crawler to create a report of the 1000+ Webservices I need to manage. I therefore created a TPL Dataflow Pipeline to manage getting and processing the data. The Pipeline I imagined looks a little bit like this (sorry for my paint skills :D): The Pipeline

I already created a implementation and everything worked fine until I started my Pipeline as a whole. I gave 500 objects into the Pipeline as a Input into the Pipeline and expected that the programm would run a little while but the Programm stopped execution after moving to the Execution Block. After checking the flow of the Programm it seemed to me that the Completion propagated to fast to the Dispose Block. I created a small sample Project with the same Pipeline to check if it was my Implementation of the Input classes or the Pipeline itself. The Sample code is this:

public class Job
{
    public int Ticker { get; set; }

    public Type Type { get; }

    public Job(Type type)
    {
        Type = type;
    }

    public Task Prepare()
    {
        Console.WriteLine("Preparing");
        Ticker = 0;
        return Task.CompletedTask;
    }

    public Task Tick()
    {
        Console.WriteLine("Ticking");
        Ticker++;
        return Task.CompletedTask;
    }

    public bool IsCommitable()
    {
        Console.WriteLine("Trying to commit");
        return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
    }

    public bool IsFinished()
    {
        Console.WriteLine("Trying to finish");
        return Ticker == 1000000;
    }

    public void IntermediateCleanUp()
    {
        Console.WriteLine("intermediate Cleanup");
        Ticker = Ticker - 120;
    }

    public void finalCleanUp()
    {
        Console.WriteLine("Final Cleanup");
        Ticker = -1;
    }
}

This is my Input class that is entered into the Preparation Block.

public class Dataflow
{
    private TransformBlock<Job, Job> _preparationsBlock;

    private BufferBlock<Job> _balancerBlock;

    private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 4
    };

    private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    private TransformBlock<Job, Job> _typeATickBlock;

    private TransformBlock<Job, Job> _typeBTickBlock;

    private TransformBlock<Job, Job> _writeBlock;

    private TransformBlock<Job, Job> _intermediateCleanupBlock;

    private ActionBlock<Job> _finalCleanupBlock;

    public async Task Process()
    {
        CreateBlocks();

        ConfigureBlocks();

        for (int i = 0; i < 500; i++)
        {
            await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
        }
        _preparationsBlock.Complete();

        await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
    }

    private void CreateBlocks()
    {
        _preparationsBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Prepare();
            return job;
        }, _options);

        _balancerBlock = new BufferBlock<Job>(_options);

        _typeATickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            return job;
        }, _options);

        _typeBTickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            await job.Tick();
            return job;
        }, _options);

        _writeBlock = new TransformBlock<Job, Job>(job =>
        {
            Console.WriteLine(job.Ticker);
            return job;
        }, _options);

        _finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);

        _intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
        {
            job.IntermediateCleanUp();
            return job;
        }, _options);
    }

    private void ConfigureBlocks()
    {
        _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

        _balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
        _balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);

        _typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
        _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());

        _typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());

        _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
        _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

        _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
    }
}

this is my Dataflow Pipeline representing my "artwork" above :D. All of this is executed in my Scheduler that is started in the Programm.cs:

public class Scheduler
{
    private readonly Timer _timer;

    private readonly Dataflow _flow;


    public Scheduler(int intervall)
    {
        _timer = new Timer(intervall);
        _flow = new Dataflow();
    }

    public void Start()
    {
        _timer.AutoReset = false;
        _timer.Elapsed += _timer_Elapsed;
        _timer.Start();
    }

    private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        try
        {
            _timer.Stop();
            Console.WriteLine("Timer stopped");
            await _flow.Process().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            Console.WriteLine("Timer started again.");
            _timer.Start();
        }
    }
}

class Program
{
    static  void Main(string[] args)
    {
        var scheduler = new Scheduler(1000);
        scheduler.Start();

        Console.ReadKey();

    }
}

The Console Output i am getting is: Timer stopped Preparing Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Trying to commit Trying to finish

It seems like the Programm has stopped working at that point because I am not hitting any Breakpoints or getting any further. I think all my Blocks already received a Completion Signal and therefore stop taking any new Items. Therefore my Question is: How do I manage the Completion signal so that the pipeline only finishes when there is no more work to do?

Synchronize answered 24/3, 2019 at 13:19 Comment(7)
I read this out loud to my girlfriend: Timer stopped Preparing Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Trying to commit Trying to finish, she says it sounds like a computer having sex :) :)Luttrell
NIce one. The "real" Programm doesn't sound like this. I just didn't want to post like 5000 lines here because that would be probably not that easy to debug with all that overheadSynchronize
Nothing wrong with your question, I just found it really funny when she said that. Just wanted to lighten up the mood.Luttrell
The one to achieve that is to handle a counter for your tasks, so you can understand that every task founded was started, and only after that complete the pipelineChapel
Studying your pipeline, it seems your typeBTickBlock is backing up your pipeline. It has capacity of 4 and will not release it's jobs anywhere and will eventually back up the upstream blocks. Debug and check your buffers you may find they're full of type B's.Nebo
If your not going to process a type b then you should send them to a NullTarget instead. That will drop all messages and prevent a backup.Nebo
@Nebo Ah okay. Thanks for the Info. However even when I guarantee that no B Blocks get created by removing the line to create some (so that only A's get created) or by linking the typeBTickBlock to the final Cleanup Block I am not coming that much further. It seems like the TypeABlock received that it has completed and therefore is not accepting more. The reason I think it does, is because when I am removing the typeBTickBlock from the Pipeline the Timer restarts before fully processing even one Item.Synchronize
N
4

The main issue with your flow is the feedback loop to your tick block. This causes two problems.

  • Back Pressure
  • Completion Flow

First: Back Pressure

When _typeATickBlock is linked back on itself it will stop accepting all messages once it has reached its capacity. In your case 4, that means once it has 3 messages in the output buffer and one being processed, it will stop accepting and passing messages. You can see this by adding the following line to the block:

Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");

And will output:

Tick Block 0/3

To fix this you can add any buffering block, Buffer or Transform. The key will be the bounded capacity of the buffer. In your case every single message will need to be rerouted back to the tick block. With that you know that your capacity needs to match the volume of messages at any given time. In this case 500.

_printingBuffer = new TransformBlock<Job, Job>(job =>
{
    Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
    return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });

In your real code you may not know the value and Unbounded may be your best option to avoid locking your pipeline but you can tune this value given your incoming volume.

Second: Completion Flow

With a feedback loop in your pipeline completion propagation becomes more difficult than simply setting the link options. Once completion hits the tick block it stops accepting all messages, even the ones that still need to be processed. To avoid this you need hold propagation until all messages have passed the loop. First you stop propagation just before the tick block and then check the buffers on each block that participates in the loop. Then once all buffers are empty propagate completion, and fault, to the block.

_balancerBlock.Completion.ContinueWith(tsk =>
{
    while (!_typeATickBlock.Completion.IsCompleted)
    {
        if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
        && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
        {
            _typeATickBlock.Complete();
        }
    }
});

Last

Your complete ConfigureBlocks with completion setup and the buffer inserted should look something like this. Note I'm only passing complete and not fault here and I removed the type B branch.

private void ConfigureBlocks()
{
    _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

    _balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);

    _balancerBlock.Completion.ContinueWith(tsk =>
    {
        while (!_typeATickBlock.Completion.IsCompleted)
        {
            if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
            && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
            {
                _typeATickBlock.Complete();
            }
        }
    });

    _typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
    _printingBuffer.LinkTo(_typeATickBlock);
    _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());            

    _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
    _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

    _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}

I wrote a blog post a while back, blog is no longer active, about handling completion with feedback loops. It may provide some more help. Retrieved from WayBackMachine.

Finding Completion in a Complex Flow: Feedback Loops

Nebo answered 25/3, 2019 at 18:46 Comment(9)
Nice Answer (and nice Blockpost :D). the Idea to check the input and Output Queue is quite nice. However I have two small questions. Shouldn't I check the queues of _writeBlock and _intermediateCleanupBlock as well. Because they also create a loop to _typeABlock. And what is with the Jobs that are currently processed. Are these counted in either Input or Outputcount? The Idea behind the question is that normally one Job can be long running (think 4 Minutes) in _typeABlock and is not Commitable after only one round. Therefore it would need to go back again.Synchronize
Ah yes, I put in the minimum amount of checks to get your flow to run properly based on the quick processing time of each tick. Your right that you will have to check each block in your loop, right now counting the new buffer, you have 4 blocks that each need an in process counter check and a buffer check to make sure no items are processing, and no items are waiting. You can imagine this adds a lot of code and the blog post goes into more details about how to abstract away some the added complexity. In short data flow loops are not easy.Nebo
I can post snippets in the answer from the post if you have specific questions on how to implement the strategies. Another option is the DataflowEx library that has another implementation of the feedback loop pattern and much more, I'd recommend looking into it. It comes with a lot of helpful features when working with data flow.Nebo
Specifically this may help Cyclic graph and ring completion detectionNebo
Thanks for your Answer. It really guided me to (what I believe) is the best Answer. In my real Application. I know beforehand how many Jobs I have and I know that a Job successfully processed (or faulted) the loop if it arrives into the last Block (meaning the Dispose Block). Therefore I created a class variable that I am incrementing using Interlocked.Increment() in the Dispose Block. Then I am waiting in a similar fashion as your proposal until my local job count is equal to the known maximal Job Count and then complete my Dataflow Pipeline. That seems to work (but I am still testing it).Synchronize
DataflowEx seems quite nice but a bit too large for my case. The Project I am working on is already "enterprise" large and I am currently the only one managing it. That means that I don't really want to take the hit and manage even more external Dependencies than I really need. Unless my Solution breaks (or you offer me an even better Solution :) ) I think I stay with my own Code. But I'll remember the Library (and your Blockpost is already bookmarked) in case I really see the need for this Abstraction.Synchronize
Isn't there a race condition in the blog post? It considers a TransformBlock complete if: handlingMessages == 0 && HandleMessageBlock.InputCount == 0 && HandleMessageBlock.OutputCount == 0 where the transform Func looks like: Interlocked.Increment(ref handlingMessages); ... Interlocked.Decrement(ref handlingMessages); I think it could prematurely say it's complete if the above runs right after a message is taken from the input queue, but before the transform delegate is invoked. In that case, all counts would be 0.Ortrude
Nice answer and explanation, but not a fan of the way the _typeATickBlock block is completed, by looping and checking the InputCount/OutputCount properties. There should be a more robust and efficient solution than that.Volt
@TheodorZoulias Yeah I'm not happy with it either, there should be a more robust solution but I haven't spent anytime with this stuff lately to develop one. Feel free to edit or add an answer for future googlers. I think it comes down to just "dataflow loops are hard" lolNebo

© 2022 - 2024 — McMap. All rights reserved.