How to link two C# APIs that expect you to provide a stream?
Asked Answered
I

2

7

I am working with two C# stream APIs, one of which is a data source and the other of which is a data sink.

Neither API actually exposes a stream object; both expect you to pass a stream into them and they handle writing/reading from the stream.

Is there a way to link these APIs together such that the output of the source is streamed into the sink without having to buffer the entire source in a MemoryStream? This is a very RAM-sensitive application.

Here's an example that uses the MemoryStream approach that I'm trying to avoid, since it buffers the entire stream in RAM before writing it out to S3:

using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // This destructor finishes the file and transferUtil closes 
    // the stream, so we need this weird using nesting to keep everyone happy.
    using (var parquetWriter = new ParquetWriter(schema, buffer)) 
        using (var rowGroupWriter = parquetWriter.CreateRowGroup())
        {
            rowGroupWriter.WriteColumn(...);
            ...
        }
    transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
Impossible answered 10/10, 2018 at 15:40 Comment(3)
What's stopping you from passing the stream that you passed into the data source API to the data sink API? Without seeing any code, it's very hard to figure out why this is a challenge.Ferryboat
The source API writes to a stream based on data passed into its writer object via function calls (as demonstrated in the first link in my question); ideally you pass it a stream which represents the final destination for your data (which I don't have, since the sink API expects a stream instead of providing one).Impossible
In the general case, it's not really possible. But in this case, from what I can see, yo can create a TransferUtilityUploadRequest and sets its InputStream, then run an UploadAsync of this request (that you can Wait on if you don't want async)Oldwife
H
7

You are looking for a stream that can be passed to both the data source and sink and that can 'transfer' the data between the two asynchronously. There are a number of possible solutions and I might have considered a producer-consumer pattern around a BlockingCollection.

Recently, the addition of the System.IO.Pipelines, Span and Memory types have really focused on high performance IO and I think it would be a good fit here. The Pipe class with it's associated Reader and Writer, can automatically handle the flow control, back pressure and IO between themselves whilst utilising all the new Span and Memory related types.

I have uploaded a Gist at PipeStream that will give you a custom stream with an internal Pipe implementation that you can pass to both your API classes. Whatever is written to the WriteAsync (or Write) method will be made available to the ReadAsync (or Read) method without requiring any further byte[] or MemoryStream allocations

In your case you would simply substite the MemoryStream for this new class and it should work out of the box. I haven't got a full S3 test working but reading directly from the Parquet stream and dumping it to the console window shows that it works asynchronously.

// Create some very badly 'mocked' data
var idColumn = new DataColumn(
    new DataField<int>("id"),
    Enumerable.Range(0, 10000).Select(i => i).ToArray());

var cityColumn = new DataColumn(
    new DataField<string>("city"),
    Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());

var schema = new Schema(idColumn.Field, cityColumn.Field);

using (var pipeStream = new PipeStream())
{
    var buffer = new byte[4096];
    int read = 0;

    var readTask = Task.Run(async () =>
    {
        //transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread 
        while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
        {
            var incoming = Encoding.ASCII.GetString(buffer, 0, read);
            Console.WriteLine(incoming);
            // await Task.Delay(5000); uncomment this to simulate very slow consumer
        }
    });

    using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
    using (var rowGroupWriter = parquetWriter.CreateRowGroup())
    {
        rowGroupWriter.WriteColumn(idColumn);  // Step through both these statements to see data read before the parquetWriter completes
        rowGroupWriter.WriteColumn(cityColumn);
    }       
}

The implementation is not completely finished but I think it shows a nice approach. In the console 'readTask' you can un-comment the Task.Delay to simulate a slow read (transferUtil) and you should see the pipe automatically throttles the write task.

You need to be using C# 7.2 or later (VS 2017 -> Project Properties -> Build -> Advanced -> Language Version) for one of the Span extension methods but it should be compatible with any .Net Framework. You may need the Nuget Package

The stream is readable and writable (obviously!) but not seekable which should work for you in this scenario but wouldn't work reading from the Parquet SDK which requires seekable streams.

Hope it helps

Humidify answered 13/10, 2018 at 19:39 Comment(2)
I was prototyping a solution around a BlockingCollection, but this looks better-suited to the task.Impossible
The gist was a good start but PipeStream (even if it worked) has a big problem because disposing of the stream is key to get the consumer to terminate, and the PipeStream is shared between producer and consumer. See Pipe.Reader.AsStream() and Pipe.Writer.AsStream() for a much nicer approach -- just make sure you dispose of the Writer.AsStream when the producer is done filling the stream.Spraddle
S
0

Using System.IO.Pipelines it would look something like this:

var pipe = new System.IO.Pipelines.Pipe();
using (var buffer = pipe.Writer.AsStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // we can start the consumer first because it will just block 
    // on the stream until data is available
    Task consumer = transferUtil.UploadAsync(pipe.Reader.AsStream(), _bucketName, _key.Replace(".gz", "") + ".parquet");
    // start a task to produce data
    Task producer = WriteParquetAsync(buffer, ..);
    // start pumping data; we can wait here because the producer will
    // necessarily finish before the consumer does
    await producer;
    // this is key; disposing of the buffer early here causes the consumer stream
    // to terminate, else it will just hang waiting on the stream to finish.
    // see the documentation for Writer.AsStream(bool leaveOpen = false)
    buffer.Dispose();
    // wait the upload to finish
    await consumer;

}

Spraddle answered 4/11, 2021 at 2:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.