I am new to TPL Dataflow.
I am trying to build a throttled async update to a fairly fast flowing input stream. BufferBlock seemed a nice match for this with the idea that I could call the ReceiveAll() to grab everything off the buffer and on the occasions there is nothing there I can await on a ReceiveAsync() to pick up the next element as it arrives.
But it seems to sometimes hang on the ReceiveAsync() call; and the conditions of failure are odd.
Please note I am interested in why this hangs. I have already found another way to make the application I am working on work, but it is possibly not as neat or extensible as I am not using TPL Dataflow since I obviously don't understand how it works.
Further note The key usage here is that I do a TryReceiveAll() then await ReceiveAsync() if that fails. This is a common pattern where burst data arrives and I want to process data so far as a batch. Which is the reason that I don't want to just loop on the ReceiveAsync(), and thus why just directly hooking an ActionBlock or a TransformBlock would not work. If I remove the TryReceiveAll() my version seems to work as expected; though, as other comments have noted the behaviour, seems to be different for different people so this may be a coincidence.
Here is a failing example... drop it in a console app with System.Threading.Tasks.Dataflow.dll referenced and usings:
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Failing example:
class Program
{
static void Main(string[] args)
{
var context = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);
// shove 10 things onto the buffer
for(int i = 0; i < 10; i++)
{
// shove something on the buffer every second
buffer.Post(i);
Thread.Sleep(1000);
}
context.Cancel();
}
// 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
// 2. Then we should see nothing for a second
// 3. Then we immediately process the next element from the await ReceiveAsync.
// 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
IList<int> elements;
if (!buffer.TryReceiveAll(out elements))
{
try
{
var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
elements = new List<int> { element };
}
catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
}
if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
}
}
catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
}
}
output is:
11:27: This Breaks it...
Elements: 0
11:27: This Breaks it...
Failed to get anything on await...
11:27: This Breaks it...
Elements: 1, 2, 3, 4, 5
...And so on
But if I change the log line
Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
to
Console.WriteLine("This Works...");
The the output comes out as expected:
This Works...
Elements: 0
This Works...
Elements: 1
This Works...
Elements: 2
This Works...
etc. But even the act of copying text from the console will switch it back to the failing output.
Ideas?
ProcessBuffer
and simply link theBufferBlock
to anActionBlock
orTransformBlock
. – OnitaonlookerDateTime
value that causes the issue. You can retrieveDateTime.Now
without causing the issue, and you can print to the console text of the exact same length. To further confuse the question, I have found that sometimes it still works fine, and that if I either don't format theDateTime
value or don't print it, it also works fine. – TermagantBufferBlock
object or a related class was trying to be clever, attempted to write a fancy synchronization implementation (e.g. lockless, spin-wait, etc.) and introduced a race condition that breaks the wait in certain cases. For what it's worth, if I flip the threads by callingProcessBuffer()
directly and running thePost()
loop in aTask.Run()
, I am unable to reproduce the problem at all. – Termagant