I'm trying to get my head around controlling dataflow in TPL Dataflow. I have a very fast producer, and a very slow consumer. (My real code is more complex, but none the less, this is a pretty good model and it reproduces the problem.)
When I run it, the code starts drinking memory like it's going out of style--and the output queue on the producer fills up as fast as it can. What I'd really prefer to see is the Producer stop running for a while, until the Consumer has a chance to ask for it. From my readings of the documentation, this is what is supposed to happen: that is, I thought that the producer waits until the consumer has space.
This isn't the case, clearly. How do I fix it so that the queue doesn't go crazy?
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
namespace MemoryLeakTestCase
{
class Program
{
static void Main(string[] args)
{
var CreateData = new TransformManyBlock<int, string>(ignore =>
{
return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
});
var ParseFile = new TransformManyBlock<string, string>(fileContent =>
{
Thread.Sleep(1000);
return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
);
var EndOfTheLine = new ActionBlock<object>(f =>
{
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
CreateData.LinkTo(ParseFile, linkOptions);
ParseFile.LinkTo(EndOfTheLine, linkOptions);
Task t = new Task(() =>
{
while (true)
{
Console.WriteLine("CreateData: " + Report(CreateData));
Console.WriteLine("ParseData: " + Report(ParseFile));
Console.WriteLine("NullTarget: " + EndOfTheLine.InputCount );
Thread.Sleep(1000);
}
});
t.Start();
CreateData.SendAsync(0);
CreateData.Complete();
EndOfTheLine.Completion.Wait();
}
public static string Report<T, U>(TransformManyBlock<T, U> block)
{
return String.Format("INPUT: {0} OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
}
}
}
BoundedCapacity
onParseFile
to something a little smaller like maybe 10 just to make sure it works? – SomervilleCreateData
even bigger. The major challenge I'm trying to solve here is how to make sure thatCreateData
pauses when theParseFile
queue is too big (or when its own output queue is too big). – VelmaTransformManyBlock
implementation that is output bounded and supports allExecutionDataflowBlockOptions
, look here. – Greenland