How to implement continuously running dataflow blocks in TPL?
Asked Answered
C

2

6

I have producer/consumer dataflow block set-up using BufferBlock and ActionBlock and it is working fine inside Console application;

After adding all items into BurfferBlock and Linking BufferBlock with other Action Items; it is working good.

now I want to use that inside service where this dataflow block pipeline will always be up and and when messages will be available through external events it would go inside bufferblock and that will start processing. How can I achieve this?

So far I have done below:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}
Cushy answered 2/12, 2013 at 19:0 Comment(5)
Why don't you just do it? What have you tried and how did that fail?Industrious
@Industrious I have added what I have implemented so farCushy
So, what's the problem? Does that code work as you expect? What's stopping you from posting any events into that pipeline?Industrious
I can post message; how can I achieve this without Timer. I donot want to call Pipeline.Complete() otherwise I will have to reinitialize Pipeline again; which I don't want (becuase I am trying to keep this pipeline it always open)Cushy
Related: How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize?Gumbotil
S
2

Your batch size is defined by the variable 'BoundingCapacity' in the batchblock constructor. A batch will be posted when:

  • A number of posts equal to the batch size have been received (specified in constructor)
  • The batch block is marked for completion
  • The triggerbatch method is called

It seems like you want a batch to post when the bath size is met or a timeout occurs. If this is the case, and if batch size is not critical, I would really just add a recurring interval to the timer you have and make the object downstream of the batchblock ignore empty posts.

What you may actually want, and what is most inline with the philosophy of dataflow programming, is to create a new batch block when you begin posting a series of items and then completing it when done or when a timeout occurs. New posts would create a new batchblock if one does not already exist.

The problem with trying to implement a timeout timer around the batchblock that only fires based on the first trigger is that you will either need to count and verify posts to bufferblock or you will need to watch posts from bufferblock. Both of these scenarios will create a lot of ugliness and/or violate block encapsulation.

Safari answered 16/12, 2013 at 19:45 Comment(4)
Thanks for your suggestions; I want to avoid creating batch block every time. As you see my program basically converts chatty messages into chunk using buffer. Buffer works great with BoundingCapacity; in my case I have set it to 100. But I don't want to wait until all 100 messages are arrived. I want dual control over BufferBlock; e.g. if there are 100 messages or 5 seconds (both can be configured). Solutions works as per my need but I wanted to see if anyone else has better solutions. They key here is I want BufferBlock to behave in dual mode; on hitting BoundinCapacity and hitting timeout.Cushy
Once you pass an item to a dataflow block, you need to become ignorant of that item. Dataflow blocks are supposed to be data driven. External control is frowned upon. I would honestly modify what you have so that the 'timeout trigger' and the batch block are enclosed within a single IPropagatingBlock.Safari
@VeteCoffee - this makes sense; this will compartmentalize blocks and i can replace with something else in future... ThanksCushy
Downvotes are fine, but please leave feedback to help me revise my answer.Safari
C
2

As a gross oversimplification, DataFlow is a way to process a bunch of objects using a set of methods. It doesn't provide or expect any specific way of creating these objects.

If you want a pipeline to stay alive, just don't terminate the application. If you don't want to use a Console application, create a service that builds the pipeline and sends objects to it until it closes.

Messages are just objects that you will create by reading data, in response to events (whatever that means) or any other way.

As for external events, what do you mean by that? That someone will send data to your application? There are many ways this can happen:

  • If the data comes from another console application, you can pipe the results of one application to the other, parse data coming from the input stream of your command-line application, create messages and pass them to the pipeline
  • If you want a service listening for requests, you can host a .NET Pipe, WCF or Web API service to listen for calls and pass the posted data to the pipeline.
  • If the data comes from a database, you may be able to poll for changes and send any changed data to the pipeline.

The point is, Dataflow is about processing data, not about listening to events. It's not a full blown distributed agent system, if that's what you were looking for.

Conventionalism answered 3/12, 2013 at 10:58 Comment(1)
External events i.e. I have notifications updates coming from different channel; which I am posting on BufferBlock. I have 24/7 service up and running which listens for these incoming position updates and based on that do some down stream processing...Cushy
S
2

Your batch size is defined by the variable 'BoundingCapacity' in the batchblock constructor. A batch will be posted when:

  • A number of posts equal to the batch size have been received (specified in constructor)
  • The batch block is marked for completion
  • The triggerbatch method is called

It seems like you want a batch to post when the bath size is met or a timeout occurs. If this is the case, and if batch size is not critical, I would really just add a recurring interval to the timer you have and make the object downstream of the batchblock ignore empty posts.

What you may actually want, and what is most inline with the philosophy of dataflow programming, is to create a new batch block when you begin posting a series of items and then completing it when done or when a timeout occurs. New posts would create a new batchblock if one does not already exist.

The problem with trying to implement a timeout timer around the batchblock that only fires based on the first trigger is that you will either need to count and verify posts to bufferblock or you will need to watch posts from bufferblock. Both of these scenarios will create a lot of ugliness and/or violate block encapsulation.

Safari answered 16/12, 2013 at 19:45 Comment(4)
Thanks for your suggestions; I want to avoid creating batch block every time. As you see my program basically converts chatty messages into chunk using buffer. Buffer works great with BoundingCapacity; in my case I have set it to 100. But I don't want to wait until all 100 messages are arrived. I want dual control over BufferBlock; e.g. if there are 100 messages or 5 seconds (both can be configured). Solutions works as per my need but I wanted to see if anyone else has better solutions. They key here is I want BufferBlock to behave in dual mode; on hitting BoundinCapacity and hitting timeout.Cushy
Once you pass an item to a dataflow block, you need to become ignorant of that item. Dataflow blocks are supposed to be data driven. External control is frowned upon. I would honestly modify what you have so that the 'timeout trigger' and the batch block are enclosed within a single IPropagatingBlock.Safari
@VeteCoffee - this makes sense; this will compartmentalize blocks and i can replace with something else in future... ThanksCushy
Downvotes are fine, but please leave feedback to help me revise my answer.Safari

© 2022 - 2024 — McMap. All rights reserved.