I am trying to implement a data processing pipeline using TPL Dataflow
. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve.
Problem:
I am trying to iterate through the list of files and process each file to read some data and then further process that data. Each file is roughly 700MB
to 1GB
in size. Each file contains JSON
data. In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<>
with yield return
and then further process the data.
Once I get list of files, I want to process maximum 4-5 files at a time in parallel. My confusion comes from:
- How to use
IEnumerable<>
andyeild return
withasync/await
and dataflow. Came across this answer by svick, but still not sure how to convertIEnumerable<>
toISourceBlock
and then link all blocks together and track completion. - In my case,
producer
will be really fast (going through list of files), butconsumer
will be very slow (processing each file - read data, deserializeJSON
). In this case, how to track completion. - Should I use
LinkTo
feature of datablocks to connect various blocks? or use method such asOutputAvailableAsync()
andReceiveAsync()
to propagate data from one block to another.
Code:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
...
// Get list of file Uris
...
foreach(var fileNameUri in fileNameUris)
await targetBlock.SendAsync(fileNameUri, token);
targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
var httpClient = new HttpClient();
try
{
using (var stream = await httpClient.GetStreamAsync(fileNameUri))
using (var sr = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(sr))
{
while (jsonTextReader.Read())
{
if (jsonTextReader.TokenType == JsonToken.StartObject)
{
try
{
var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
await _messageBufferBlock.SendAsync(data, token);
}
catch (Exception ex)
{
_logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
}
}
}
}
}
catch(Exception ex)
{
// Should throw?
// Or if converted to block then report using Fault() method?
}
finally
{
httpClient.Dispose();
buffer.Complete();
}
}
private void PrepareDataflow(CancellationToken token)
{
_fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
{
CancellationToken = token
});
var actionExecuteOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = ProcessingSize,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = ProcessingSize
};
_processingBlock = new ActionBlock<string>(async fileName =>
{
try
{
await ProcessFileAsync(fileName, token);
}
catch (Exception ex)
{
_logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
// Should fault the block?
}
}, actionExecuteOptions);
_fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 50000
});
_messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
In the above code, I am not using IEnumerable<DataType>
and yield return
as I cannot use it with async/await
. So I am linking input buffer to ActionBlock<DataType>
which in turn posts to another queue. However by using ActionBlock<>
, I cannot link it to next block for processing and have to manually Post/SendAsync
from ActionBlock<>
to BufferBlock<>
. Also, in this case, not sure, how to track completion.
This code works, but, I am sure there could be better solution then this and I can just link all the block (instead of ActionBlock<DataType>
and then sending messages from it to BufferBlock<DataType>
)
Another option could be to convert IEnumerable<>
to IObservable<>
using Rx
, but again I am not much familiar with Rx
and don't know exactly how to mix TPL Dataflow
and Rx
async
idiom in C# provides a very good, clean way to implement asynchronous code in a virtually non-asynchronous form, which is useful regardless of any performance benefits. YMMV. – UnconscionableHttpClient
and opens a stream - which I feed intoStreamReader
andJsonTextReader
. - So yes, to answer your comments (@usr) - there will be IO work and alsoasync/await
provides a cleaner way to implement code – ScheersJSON.NET
does not supportasync/await
yet. Think of a scenario, whereby I don't have file on disk, but hosted on web server. So, I would have something likestream = await httpClient.GetStreamAsync(uri)
and then pass that stream toStreamReader
and in turnJsonTextReader
. Also, the file would be really large, so instead of deserializing whole file at once, I would like to process it record by record in file. That way I would not hitOutOfMemoryException
– Scheers_fileBufferBlock
implementation feels a bit... dirty. All you're doing is dumping your file names into theBufferBlock
, which does not have a capacity limit, incurringasync/await
overheads for zero benefit. You could reduce the number of moving parts and just post or send each of your filenames directly to yourActionBlock
instead. ThatActionBlock
also has aBoundedCapacity
, and so will throttle the producer for you, thereby managing the back-pressure (which may be important as you stated that your producer is much faster than consumer). – SaltsBufferBlock<>
, I can directly post toActionBlock<>
orTransformBlock<,>
orIPropagateBlock<,>
and setBoundedCapacity
andMaxMessagesPerTask
andMaxDegreeOfParallelism
on that block. The code above is sort of sample code. – ScheersDataType
instances for a file at the same time (into an array), then you can useTransformManyBlock
. If it's not, then what you have is as good as dataflow can do (Rx can do more). – PrincipleIenumerable<>
orIObservable<>
and process the records from file. – Scheers