Why does BufferBlock<T>.ReceiveAsync() hang when data is available?
Asked Answered
S

0

7

I am new to TPL Dataflow.

I am trying to build a throttled async update to a fairly fast flowing input stream. BufferBlock seemed a nice match for this with the idea that I could call the ReceiveAll() to grab everything off the buffer and on the occasions there is nothing there I can await on a ReceiveAsync() to pick up the next element as it arrives.

But it seems to sometimes hang on the ReceiveAsync() call; and the conditions of failure are odd.

Please note I am interested in why this hangs. I have already found another way to make the application I am working on work, but it is possibly not as neat or extensible as I am not using TPL Dataflow since I obviously don't understand how it works.

Further note The key usage here is that I do a TryReceiveAll() then await ReceiveAsync() if that fails. This is a common pattern where burst data arrives and I want to process data so far as a batch. Which is the reason that I don't want to just loop on the ReceiveAsync(), and thus why just directly hooking an ActionBlock or a TransformBlock would not work. If I remove the TryReceiveAll() my version seems to work as expected; though, as other comments have noted the behaviour, seems to be different for different people so this may be a coincidence.

Here is a failing example... drop it in a console app with System.Threading.Tasks.Dataflow.dll referenced and usings:

using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

Failing example:

class Program
{
    static void Main(string[] args)
    {
        var context = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
        var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);

        // shove 10 things onto the buffer
        for(int i = 0; i < 10; i++)
        {
            // shove something on the buffer every second
            buffer.Post(i);
            Thread.Sleep(1000);
        }
        context.Cancel();
    }

    // 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
    // 2. Then we should see nothing for a second
    // 3. Then we immediately process the next element from the await ReceiveAsync.
    // 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
    static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
                IList<int> elements;
                if (!buffer.TryReceiveAll(out elements))
                {
                    try
                    {
                        var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
                        elements = new List<int> { element };
                    }
                    catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
                }
                if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
            }
        }
        catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
    }
}

output is:

11:27: This Breaks it...
Elements: 0
11:27: This Breaks it...
Failed to get anything on await...
11:27: This Breaks it...
Elements: 1, 2, 3, 4, 5

...And so on

But if I change the log line

Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");

to

Console.WriteLine("This Works...");

The the output comes out as expected:

This Works...
Elements: 0
This Works...
Elements: 1
This Works...
Elements: 2
This Works...

etc. But even the act of copying text from the console will switch it back to the failing output.

Ideas?

Stopover answered 16/10, 2017 at 10:39 Comment(6)
What exactly are you trying to accomplish? You could remove ProcessBuffer and simply link the BufferBlock to an ActionBlock or TransformBlock.Onitaonlooker
Well the question is why does this seem to hang; which I am interested in even if there is another way to achieve the system goal that lies behind this. The above code is a sample to reproduce the issue. It has nothing to do with the real system I am building. As to what I am doing; I've got a inbound flow of data that I am caching and signalling new data on over the buffer. I have a separate task that will then periodically lift all updates and push them out through several client streams.Stopover
Maybe someone who is an expert with the Dataflow API could explain this, but it looks to me like a straight-up bug, and an odd one at that. It is the formatting and printing of the DateTime value that causes the issue. You can retrieve DateTime.Now without causing the issue, and you can print to the console text of the exact same length. To further confuse the question, I have found that sometimes it still works fine, and that if I either don't format the DateTime value or don't print it, it also works fine.Termagant
I suspect someone implementing the BufferBlock object or a related class was trying to be clever, attempted to write a fancy synchronization implementation (e.g. lockless, spin-wait, etc.) and introduced a race condition that breaks the wait in certain cases. For what it's worth, if I flip the threads by calling ProcessBuffer() directly and running the Post() loop in a Task.Run(), I am unable to reproduce the problem at all.Termagant
I'm not able to reproduce with v 4.5.24 of Tpl.Dataflow and a straight copy & paste into a .NET4.6.1 console app. Any chance you can post a complete example somewhere?Eyrie
scratch that; sorry, reproduced now. It's v intermittant for me and never happens with the debugger attached (at least not so far)Eyrie

© 2022 - 2024 — McMap. All rights reserved.