Is there a TPL dataflow block that doesn't take input but returns output?
Asked Answered
W

2

4

The title of my question says it all.

I am looking for a TPL dataflow block that doesn't need an input.

Right now I am using a transform block but it's input is unused.

Wills answered 5/7, 2016 at 8:42 Comment(13)
No. Why do you need it too? Maybe we can create something like that.Xerophagy
My first block pulls data from a queue. It doesn't need any input to pull data from the queue.Wills
@BilalFazlani actually, it does have an input. The queue itself. A TransformManyBlock can receive the queue as input and return its contents as output. Or you could replace your queue with a BufferBlock (which is a queue as well). Or you could just use a loop that pops messages from the queue and posts them to the first blockTitle
@PanagiotisKanavos not really. The queue is fixed for all messages. There is no need for me to parameterize the queue because it never changes.Wills
@BilalFazlani global statics are a bad idea, and that's what this queue is if you hard-code it inside the block. Besides, why create a custom block when there is one that can do the job for you already? Finally, how can you test your mesh if the queue is hard-coded inside the first block?Title
Yes, I am using an Transform manyblock right now. But its like TransformManyBlock<int,Ienumerable<message>> and i just pass 0Wills
The queue is being injected from IoC.Wills
May be you are rightWills
Pass a reference to the queue then! In the delegate itself, use yield return to push each message immediately, eg queue=>{ while (!queue.Length==0){var msg=queue.Pop(); yield return msg;}}Title
I will just parameterize the queue if there is no other block that can give an output without inputWills
Its an aws SQS queue btw :) but I can parameterize it.Wills
@BilalFazlani you HAVE to parameterize it in this case to be able to test against a staging environment. I have quite a few stories about tests that accidentally used a production queue URL. In this case, the input can be the SQS URL (or the client class if you use an IoC container)Title
I will parameterize because it seems like a good idea. it but I don't HAVE to. that's because I am registering a real queue in the application start. In the test project I am overriding that registration with a fake queue. And our staging/testing environment don't have access to prod queue. But parameterizing make sense here given that there is no block that suits my requirement. using IoC is in a way parameterizing through constructor.Wills
H
5

I would build a block like this from a BufferBlock<T>: the method accepts a delegate that presents the ITargetBlock<T> side of the block and returns the ISourceBlock<T> side of it. This way, the delegate can send input to the block, but from the outside, it looks like a block that only produces output.

The code:

public static ISourceBlock<T> CreateProducerBlock<T>(
    Func<ITargetBlock<T>, Task> producer,
    int boundedCapacity = DataflowBlockOptions.Unbounded)
{
    var block = new BufferBlock<T>(
        new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(async () =>
    {
        try
        {
            await producer(block);

            block.Complete();
        }
        catch (Exception ex)
        {
            ((IDataflowBlock)block).Fault(ex);
        }
    });

    return block;
}

Example usage:

var producer = CreateProducerBlock<int>(async target =>
{
    await target.SendAsync(10);
    await target.SendAsync(20);
});

ITargetBlock<int> consumer = …;

producer.LinkTo(consumer);
Homosexual answered 8/7, 2016 at 19:56 Comment(1)
I was looking for an example of ISourceBlock<T> implementation, but this is simple and elegant!Plush
C
0

Sometimes it is easiest to just use a throwaway bool as the input for a TransformManyBlock, and .Post(true) to kick off your pipeline.

Cutanddried answered 26/7, 2021 at 14:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.