How do I arrange flow control in TPL Dataflows?
Asked Answered
V

1

4

I'm trying to get my head around controlling dataflow in TPL Dataflow. I have a very fast producer, and a very slow consumer. (My real code is more complex, but none the less, this is a pretty good model and it reproduces the problem.)

When I run it, the code starts drinking memory like it's going out of style--and the output queue on the producer fills up as fast as it can. What I'd really prefer to see is the Producer stop running for a while, until the Consumer has a chance to ask for it. From my readings of the documentation, this is what is supposed to happen: that is, I thought that the producer waits until the consumer has space.

This isn't the case, clearly. How do I fix it so that the queue doesn't go crazy?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f =>
                {
                });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
            CreateData.LinkTo(ParseFile, linkOptions);
            ParseFile.LinkTo(EndOfTheLine, linkOptions);

            Task t = new Task(() =>
            {
                while (true)
                {
                    Console.WriteLine("CreateData: " + Report(CreateData));
                    Console.WriteLine("ParseData:  " + Report(ParseFile));
                    Console.WriteLine("NullTarget: " +  EndOfTheLine.InputCount );
                    Thread.Sleep(1000);
                }

            });
            t.Start();

            CreateData.SendAsync(0);
            CreateData.Complete();

            EndOfTheLine.Completion.Wait();
        }

        public static string Report<T, U>(TransformManyBlock<T, U> block)
        {
            return String.Format("INPUT: {0}   OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
        }


    }
}
Velma answered 21/12, 2013 at 5:28 Comment(3)
Have you tried lowering the BoundedCapacity on ParseFile to something a little smaller like maybe 10 just to make sure it works?Somerville
Lois, good thought. I have tried that; it keeps the queue on CreateData even bigger. The major challenge I'm trying to solve here is how to make sure that CreateData pauses when the ParseFile queue is too big (or when its own output queue is too big).Velma
For an alternative TransformManyBlock implementation that is output bounded and supports all ExecutionDataflowBlockOptions, look here.Greenland
H
5

Normally, what you would do in a situation like this is to also set BoundedCapacity of the CreateData block. But that won't work here, because TransformManyBlock doesn't seem to take BoundedCapacity into account when filling the output queue from a single IEnumerable.

What you can do instead is to create a function that iterates the collection and uses SendAsync() to send more data only when the target can accept them:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
/// 
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
    this ITargetBlock<T> target, IEnumerable<T> data)
{
    try
    {
        foreach (var item in data)
        {
            await target.SendAsync(item);
        }
    }
    catch (Exception e)
    {
        target.Fault(e);
    }
}

Usage:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

If you still wanted to have CreateData block that would behave similarly to your original code, you could have two bounded BufferBlocks, SendAllAsync() between them and then use Encapsulate() to make them look like one block:

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
    CreateBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
    var input = new BufferBlock<TInput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    var output = new BufferBlock<TOutput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(
        async () =>
        {
            try
            {
                while (await input.OutputAvailableAsync())
                {
                    var data = transform(await input.ReceiveAsync());

                    await output.SendAllAsync(data);
                }

                output.Complete();
            }
            catch (Exception e)
            {
                ((IDataflowBlock)input).Fault(e);
                ((IDataflowBlock)output).Fault(e);
            }
        });

    return DataflowBlock.Encapsulate(input, output);
}
Hulbig answered 22/12, 2013 at 13:50 Comment(2)
thank you for the response! That seems to solve the immediate problem, I think, although I'm still puzzling my way through your code. In particular, this model makes things happen in batch mode--I get a boundedCapacity worth of data once, then it pours out. Then a new boundedCapacity. And so on. I'd rather see, if it was possible, each unit sent individually.Velma
And, to articulate a little on @svick's comment: It turns out that TransformMany only checks queues BETWEEN input entries; in order to make this scenario work, I would need to check WITHIN an entry.Velma

© 2022 - 2024 — McMap. All rights reserved.