You are looking for a stream that can be passed to both the data source and sink and that can 'transfer' the data between the two asynchronously. There are a number of possible solutions and I might have considered a producer-consumer pattern around a BlockingCollection.
Recently, the addition of the System.IO.Pipelines, Span and Memory types have really focused on high performance IO and I think it would be a good fit here. The Pipe class with it's associated Reader and Writer, can automatically handle the flow control, back pressure and IO between themselves whilst utilising all the new Span and Memory related types.
I have uploaded a Gist at PipeStream that will give you a custom stream with an internal Pipe implementation that you can pass to both your API classes. Whatever is written to the WriteAsync (or Write) method will be made available to the ReadAsync (or Read) method without requiring any further byte[] or MemoryStream allocations
In your case you would simply substite the MemoryStream for this new class and it should work out of the box. I haven't got a full S3 test working but reading directly from the Parquet stream and dumping it to the console window shows that it works asynchronously.
// Create some very badly 'mocked' data
var idColumn = new DataColumn(
new DataField<int>("id"),
Enumerable.Range(0, 10000).Select(i => i).ToArray());
var cityColumn = new DataColumn(
new DataField<string>("city"),
Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());
var schema = new Schema(idColumn.Field, cityColumn.Field);
using (var pipeStream = new PipeStream())
{
var buffer = new byte[4096];
int read = 0;
var readTask = Task.Run(async () =>
{
//transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread
while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var incoming = Encoding.ASCII.GetString(buffer, 0, read);
Console.WriteLine(incoming);
// await Task.Delay(5000); uncomment this to simulate very slow consumer
}
});
using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(idColumn); // Step through both these statements to see data read before the parquetWriter completes
rowGroupWriter.WriteColumn(cityColumn);
}
}
The implementation is not completely finished but I think it shows a nice approach. In the console 'readTask' you can un-comment the Task.Delay to simulate a slow read (transferUtil) and you should see the pipe automatically throttles the write task.
You need to be using C# 7.2 or later (VS 2017 -> Project Properties -> Build -> Advanced -> Language Version) for one of the Span extension methods but it should be compatible with any .Net Framework. You may need the Nuget Package
The stream is readable and writable (obviously!) but not seekable which should work for you in this scenario but wouldn't work reading from the Parquet SDK which requires seekable streams.
Hope it helps