I've created something similar to a web crawler to create a report of the 1000+ Webservices I need to manage. I therefore created a TPL Dataflow Pipeline to manage getting and processing the data. The Pipeline I imagined looks a little bit like this (sorry for my paint skills :D):
I already created a implementation and everything worked fine until I started my Pipeline as a whole. I gave 500 objects into the Pipeline as a Input into the Pipeline and expected that the programm would run a little while but the Programm stopped execution after moving to the Execution Block. After checking the flow of the Programm it seemed to me that the Completion propagated to fast to the Dispose Block. I created a small sample Project with the same Pipeline to check if it was my Implementation of the Input classes or the Pipeline itself. The Sample code is this:
public class Job
{
public int Ticker { get; set; }
public Type Type { get; }
public Job(Type type)
{
Type = type;
}
public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}
public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}
public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}
public bool IsFinished()
{
Console.WriteLine("Trying to finish");
return Ticker == 1000000;
}
public void IntermediateCleanUp()
{
Console.WriteLine("intermediate Cleanup");
Ticker = Ticker - 120;
}
public void finalCleanUp()
{
Console.WriteLine("Final Cleanup");
Ticker = -1;
}
}
This is my Input class that is entered into the Preparation Block.
public class Dataflow
{
private TransformBlock<Job, Job> _preparationsBlock;
private BufferBlock<Job> _balancerBlock;
private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4
};
private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
private TransformBlock<Job, Job> _typeATickBlock;
private TransformBlock<Job, Job> _typeBTickBlock;
private TransformBlock<Job, Job> _writeBlock;
private TransformBlock<Job, Job> _intermediateCleanupBlock;
private ActionBlock<Job> _finalCleanupBlock;
public async Task Process()
{
CreateBlocks();
ConfigureBlocks();
for (int i = 0; i < 500; i++)
{
await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
}
_preparationsBlock.Complete();
await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
}
private void CreateBlocks()
{
_preparationsBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Prepare();
return job;
}, _options);
_balancerBlock = new BufferBlock<Job>(_options);
_typeATickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
return job;
}, _options);
_typeBTickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
await job.Tick();
return job;
}, _options);
_writeBlock = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine(job.Ticker);
return job;
}, _options);
_finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);
_intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
{
job.IntermediateCleanUp();
return job;
}, _options);
}
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
_balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);
_typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
}
this is my Dataflow Pipeline representing my "artwork" above :D. All of this is executed in my Scheduler that is started in the Programm.cs:
public class Scheduler
{
private readonly Timer _timer;
private readonly Dataflow _flow;
public Scheduler(int intervall)
{
_timer = new Timer(intervall);
_flow = new Dataflow();
}
public void Start()
{
_timer.AutoReset = false;
_timer.Elapsed += _timer_Elapsed;
_timer.Start();
}
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
_timer.Stop();
Console.WriteLine("Timer stopped");
await _flow.Process().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Console.WriteLine("Timer started again.");
_timer.Start();
}
}
}
class Program
{
static void Main(string[] args)
{
var scheduler = new Scheduler(1000);
scheduler.Start();
Console.ReadKey();
}
}
The Console Output i am getting is: Timer stopped Preparing Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Trying to commit Trying to finish
It seems like the Programm has stopped working at that point because I am not hitting any Breakpoints or getting any further. I think all my Blocks already received a Completion Signal and therefore stop taking any new Items. Therefore my Question is: How do I manage the Completion signal so that the pipeline only finishes when there is no more work to do?
Timer stopped Preparing Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Trying to commit Trying to finish
, she says it sounds like a computer having sex :) :) – LuttrelltypeBTickBlock
is backing up your pipeline. It has capacity of 4 and will not release it's jobs anywhere and will eventually back up the upstream blocks. Debug and check your buffers you may find they're full of type B's. – Nebo