TransformBlock never completes
Asked Answered
S

3

18

I'm trying to wrap my head around "completion" in TPL Dataflow blocks. In particular, the TransformBlock doesn't seem to ever complete. Why?

Sample program

My code calculates the square of all integers from 1 to 1000. I used a BufferBlock and a TransformBlock for that. Later in my code, I await completion of the TransformBlock. The block never actually completes though, and I don't understand why.

static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

    using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            bufferBlock.Post(number);
        }

        bufferBlock.Complete();

        // This line never completes
        calculatorBlock.Completion.Wait();

        // Unreachable code
        IList<int> results;
        if (calculatorBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}

At first I thought I created a deadlock situation, but that doesn't seem to be true. When I inspected the calculatorBlock.Completion task in the debugger, its Status property was set to WaitingForActivation. That was the moment when my brain blue screened.

Sukey answered 28/11, 2014 at 10:47 Comment(0)
B
15

The reason your pipeline hangs is that both BufferBlock and TransformBlock evidently don't complete until they emptied themselves of items (I guess that the desired behavior of IPropagatorBlocks although I haven't found documentation on it).

This can be verified with a more minimal example:

var bufferBlock = new BufferBlock<int>();
bufferBlock.Post(0);
bufferBlock.Complete();
bufferBlock.Completion.Wait();

This blocks indefinitely unless you add bufferBlock.Receive(); before completing.

If you remove the items from your pipeline before blocking by either your TryReceiveAll code block, connecting another ActionBlock to the pipeline, converting your TransformBlock to an ActionBlock or any other way this will no longer block.


About your specific solution, it seems that you don't need a BufferBlock or TransformBlock at all since blocks have an input queue for themselves and you don't use the return value of the TransformBlock. This could be achieved with just an ActionBlock:

var block = new ActionBlock<int>(
    i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2));
    },
    new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8});
foreach (var number in Enumerable.Range(1, 1000))
{
    block.Post(number);
}
block.Complete();
block.Completion.Wait();
Beowulf answered 28/11, 2014 at 11:52 Comment(2)
The documentation only had this to say: After Complete has been called on a dataflow block, that block will complete, and its Completion task will enter a final state after it has processed all previously available data. I assumed that "processed" meant "transformed". I would never have guessed that it meant "transformed and received".Sukey
@StevenLiekens I was quite surprised by that myself. I've been testing BufferBlock and looking at the source code since you asked this question.Beowulf
S
13

I think I understand it now. An instance of TransformBlock is not considered "complete" until the following conditions are met:

  1. TransformBlock.Complete() has been called
  2. InputCount == 0 – the block has applied its transformation to every incoming element
  3. OutputCount == 0 – all transformed elements have left the output buffer

In my program, there is no target block that is linked to the source TransformBlock, so the source block never gets to flush its output buffer.

As a workaround, I added a second BufferBlock that is used to store transformed elements.

static void Main(string[] args)
{
    var inputBufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    var outputBufferBlock = new BufferBlock<int>();
    using (inputBufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    using (calculatorBlock.LinkTo(outputBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            inputBufferBlock.Post(number);
        }

        inputBufferBlock.Complete();
        calculatorBlock.Completion.Wait();

        IList<int> results;
        if (outputBufferBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}
Sukey answered 28/11, 2014 at 11:51 Comment(0)
V
2

TransformBlock needs a ITargetBlock where he can post the transformation.

 var writeCustomerBlock = new ActionBlock<int>(c => Console.WriteLine(c));
        transformBlock.LinkTo(
            writeCustomerBlock, new DataflowLinkOptions
            {
                PropagateCompletion = true
            });

After this it completes.

Vote answered 29/9, 2020 at 8:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.