TransformBlock posting to output
Asked Answered
M

2

6

My scenario is that I have a BufferBlock<Stream> receiving Stream's from an external source, let's say the file system or some FTP server. These file Streams will pass into another block and undergo processing.

The only catch is that some of these files are zipped, and I would like to add a Block in the middle which would unzip files when necessary, and create multiple output Stream's for each of its entries.

However I do not want to use TransformBlockMany, because this means I have to fully receive the ZIP Stream and create the output Stream array at once.

I would want this Block to receive the ZIP Stream, start decompressing , and Push to the next stream whenever an Entry is ready, so the Process Block can start processing as soon as the first file is decompressed, and not wait until everything is decompressed.

How would I go around doing this?

Menswear answered 1/12, 2016 at 8:39 Comment(5)
What library are you using for ZIP decompression?Doyon
Using System.IO.Compression.ZipFile.Menswear
So far I understand that my problem is actually the async part. If I wouldn't use async, I might just use yield return in the TransformManyBlock. But I can't use yield return together with async.Menswear
Can you share some code?Doyon
Related: Is it possible to have any dataflow block type send multiple intermediate results as a result of a single input?. As a side note, in case you need to restrict the memory usage, be aware that the output queue of the built-in TransformManyBlock is unbounded. Here is a related topic.Cornell
M
1

I understood my problem is not being able to use an yield / async together . But after refactoring it , I got rid of that need, and came up with the following (simplified) version:

var block = new TransformManyBlock<Stream, Stream>((input) => {
var archive = new System.IO.Compression.ZipArchive(input, System.IO.Compression.ZipArchiveMode.Read, true);
foreach (ZipArchiveEntry entry in archive.Entries)
{
    if (string.IsNullOrWhiteSpace(entry.Name)) //is a folder
        continue;

    yield return entry.Open();

}

});
Menswear answered 12/12, 2016 at 9:50 Comment(0)
F
0

You can setup the intermediate block for your unzip logic with predicate linking the blocks so you can check is stream an archive or not, something like this:

var buffer = new BufferBlock<Stream>();
var unzipper = new TransformManyBlock<Stream, Stream>(input => { /* unzip here */ });
var processBlock = new ActionBlock<Stream>(input => { /* process streams here */ });

buffer.LinkTo(unzipper, input => /* check is stream a zip archive */);
unzipper.LinkTo(processBlock);
buffer.LinkTo(processBlock);

As for async usage together with yield, you may give a try to the AsyncEnumerable package available on GitHub and NuGet.

Fetich answered 26/1, 2017 at 17:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.