BufferBlock deadlock with OutputAvailableAsync after TryReceiveAll
Asked Answered
H

2

23

While working on an answer to this question, I wrote this snippet:

var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
    while (true)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(100));
        buffer.Post(null);
        Console.WriteLine("Post " + buffer.Count);
    }
});
var consumer = Task.Run(async () =>
{
    while (await buffer.OutputAvailableAsync())
    {
        IList<object> items;
        buffer.TryReceiveAll(out items);
        Console.WriteLine("TryReceiveAll " + buffer.Count);
    }
});
await Task.WhenAll(consumer, producer);

The producer should post items to the buffer every 100 ms and the consumer should clear all items out of the buffer and asynchronously wait for more items to show up.

What actually happens is that the producer clears all items once, and then never again moves beyond OutputAvailableAsync. If I switch the consumer to remove items one by one it works as excepted:

while (await buffer.OutputAvailableAsync())
{
    object item;
    while (buffer.TryReceive(out item)) ;
}

Am I misunderstanding something? If not, what is the problem?

Hokanson answered 16/8, 2014 at 9:58 Comment(4)
Looks like a bug to me. The most charitable explanation you could muster is that it only should be called when the producer has called Complete(). Makes little sense of course :) I think the bug is caused by a missing call to OfferAsyncIfNecessary(), present in TryReceive() but awol in TryReceiveAll(). Not the only problem, message number counting looks messed up as well. Post this to connect.microsoft.comBocanegra
@HansPassant DoneHokanson
Have you tried getting the await out of the while condition and replace it with true, and insert the await in the loop ... ?Tyne
@Tyne no. Because that would create an infinite loop.Hokanson
H
12

This is a bug in SourceCore being used internally by BufferBlock. Its TryReceiveAll method doesn't turn on the _enableOffering boolean data member while TryReceive does. That results in the task returned from OutputAvailableAsync never completing.

Here's a minimal reproduce:

var buffer = new BufferBlock<object>();
buffer.Post(null);

IList<object> items;
buffer.TryReceiveAll(out items);

var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);

await outputAvailableAsync; // Never completes

I've just fixed it in the .Net core repository with this pull request. Hopefully the fix finds itself in the nuget package soon.

Hokanson answered 8/12, 2014 at 9:34 Comment(0)
C
2

Alas, it's the end of September 2015, and although i3arnon fixed the error it is not solved in the version that was released two days after the error was fixed: Microsoft TPL Dataflow version 4.5.24.

However IReceivableSourceBlock.TryReceive(...) works correctly. An extension method will solve the problem. After a new release of TPL Dataflow it will be easy to change the extension method.

/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
    /* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
     * Hence this function uses TryReceive until nothing is available anymore
     * */
    IList<T> receivedItems = new List<T>();
    T receivedItem = default(T);
    while (buffer.TryReceive<T>(out receivedItem))
    {
        receivedItems.Add(receivedItem);
    }
    return receivedItems;
}

usage:

while (await this.bufferBlock.OutputAvailableAsync())
{
    // some data available
    var receivedItems = this.bufferBlock.TryReceiveAllEx();
    if (receivedItems.Any())
    {
        ProcessReceivedItems(bufferBlock);
    }
}
Cinerary answered 28/9, 2015 at 10:21 Comment(2)
I think the behaviour of TryReceiveAllEx() and TryReceiveAll() would be different in the face of multiple consumers. TryReceiveAll() would (I presume?) get a single contiguous block whereas the ex method would get essentially a scatter-shot sampling, although both would leave the block empty.Sanhedrin
Maybe you're right about different behaviour with multiple listeners. The current version seriously is different, because TryReceiveAll doesn't work at all, for no one. If you use this method the changes you'll have to make when the bug is fixed will be minimal, only on this method.Cinerary

© 2022 - 2024 — McMap. All rights reserved.