BlockingCollection<T> batching using TPL DataFlow [duplicate]
Asked Answered
O

2

5

Is there a way to batch a collection of items from the blocking collection. E.G.

I have a messaging bus publisher calling blockingCollection.Add()

And a consuming thread which is created like this:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?

Osteoblast answered 11/2, 2014 at 14:1 Comment(1)
@HansPassant: But that will result in items beyond the 10th to not be processed...?Gushy
I
4

Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

A pseudo code might look something like this:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.

Isobar answered 11/2, 2014 at 14:38 Comment(4)
Thanks Dmitri.. Just getting a type conversion problem here on the bb.LinkTo(ab). Problem converting BatchBlock<string> to ISourceBlock<string>...Osteoblast
My bad. ActionBlock<string> must be ActionBlock<string[]>, cause it receives an array of string from the batch block. See updated answerIsobar
This one marked correct because I got to learn something new with TPL DataFlow! Thanks Dmitri.Osteoblast
@Isobar According to MSDN BatchBlock is not thread-safe (see bottom of linked page).Peursem
S
4

A quick solution would be something like this

public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

then you can use it like this

        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

You can easily extend it to cover thread safety etc

Squarely answered 11/2, 2014 at 14:27 Comment(1)
+1 - Another alternative would be to use DataFlow BatchBlock, that already implements thread-safe batching.Isobar
I
4

Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

A pseudo code might look something like this:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.

Isobar answered 11/2, 2014 at 14:38 Comment(4)
Thanks Dmitri.. Just getting a type conversion problem here on the bb.LinkTo(ab). Problem converting BatchBlock<string> to ISourceBlock<string>...Osteoblast
My bad. ActionBlock<string> must be ActionBlock<string[]>, cause it receives an array of string from the batch block. See updated answerIsobar
This one marked correct because I got to learn something new with TPL DataFlow! Thanks Dmitri.Osteoblast
@Isobar According to MSDN BatchBlock is not thread-safe (see bottom of linked page).Peursem

© 2022 - 2024 — McMap. All rights reserved.