Is it possible to have any dataflow block type send multiple intermediate results as a result of a single input?
Asked Answered
H

1

1

Is it possible to get TransformManyBlocks to send intermediate results as they are created to the next step instead if waiting for the entire IEnumerable<T> to be filled?

All testing I've done shows that TransformManyBlock only sends a result to the next block when it is finished; the next block then reads those items one at a time.

It seems like basic functionality but I can't find any examples of this anywhere.

The use case is processing chunks of a file as they are read. In my case there's a modulus of so many lines needed before I can process anything so a direct stream won't work.

They kludge I've come up with is to create two pipelines:

  1. a "processing" dataflow network the processes the chunks of data as the become available

  2. "producer" dataflow network that ends where the file is broken into chunks then posted to the start of the "processing" network that actually transforms the data.

The "producer" network needs to be seeded with the starting point of the "processing" network.

Not a good long term solution since additional processing options will be needed and it's not flexible.

Is it possible to have any dataflow block type to send multiple intermediate results as created to a single input? Any pointers to working code?

Hinckley answered 12/6, 2020 at 0:56 Comment(3)
Take a look at a (now fixed) bug in the TransformManyBlock, that was discovered by someone who was probably using this block in the same way that you are trying to use it yourself. TPL Dataflow: Why does EnsureOrdered = false destroy parallelism for this TransformManyBlock?Chane
TranformManyBlock doesn't behave that way. I don't have to test it, I often process files with thousands of files that way. This block for example would emit lines as it read them : new TransformBlock<string,string>(path=>File.ReadLines(path))Doubleheader
Please post actual code that demonstrates the issueDoubleheader
C
-1

You probably need to create your IEnumerables by using an iterator. This way an item will be propagated downstream after every yield command. The only problem is that yielding from lambda functions is not supported in C#, so you'll have to use a local function instead. Example:

var block = new TransformManyBlock<string, string>(filePath => ReadLines(filePath));

IEnumerable<string> ReadLines(string filePath)
{
    string[] lines = File.ReadAllLines(filePath);
    foreach (var line in lines)
    {
        yield return line; // Immediately offered to any linked block
    }
}
Chane answered 12/6, 2020 at 5:42 Comment(2)
@Hinckley probably because of the File.ReadAllLines I used in the example. The File.ReadLines is much preferable of course, especially for large files.Chane
For me it was simplifying the lambda: re-factoring the logic to push some maintenance work into the function so I could use the yield in the function. I'm actually using StreamReader.ReadLine() in my functionHinckley

© 2022 - 2024 — McMap. All rights reserved.