Using async/await and yield return with TPL Dataflow
Asked Answered
S

3

17

I am trying to implement a data processing pipeline using TPL Dataflow. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve.

Problem:

I am trying to iterate through the list of files and process each file to read some data and then further process that data. Each file is roughly 700MB to 1GB in size. Each file contains JSON data. In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<> with yield return and then further process the data.

Once I get list of files, I want to process maximum 4-5 files at a time in parallel. My confusion comes from:

  • How to use IEnumerable<> and yeild return with async/await and dataflow. Came across this answer by svick, but still not sure how to convert IEnumerable<> to ISourceBlock and then link all blocks together and track completion.
  • In my case, producer will be really fast (going through list of files), but consumer will be very slow (processing each file - read data, deserialize JSON). In this case, how to track completion.
  • Should I use LinkTo feature of datablocks to connect various blocks? or use method such as OutputAvailableAsync() and ReceiveAsync() to propagate data from one block to another.

Code:

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}

private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);

    targetBlock.Complete();
}

private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}

private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });

    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);

    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });

    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}

In the above code, I am not using IEnumerable<DataType> and yield return as I cannot use it with async/await. So I am linking input buffer to ActionBlock<DataType> which in turn posts to another queue. However by using ActionBlock<>, I cannot link it to next block for processing and have to manually Post/SendAsync from ActionBlock<> to BufferBlock<>. Also, in this case, not sure, how to track completion.

This code works, but, I am sure there could be better solution then this and I can just link all the block (instead of ActionBlock<DataType> and then sending messages from it to BufferBlock<DataType>)

Another option could be to convert IEnumerable<> to IObservable<> using Rx, but again I am not much familiar with Rx and don't know exactly how to mix TPL Dataflow and Rx

Scheers answered 12/2, 2016 at 20:48 Comment(21)
Your processing is CPU bound. Therefore, async IO is pointless. It does not save you one millisecond of processing time. Delete everything async and the problem becomes easy.Plumule
@usr: I haven't looked closely at this specific scenario; the question is too broadly stated, and doesn't provide a good minimal reproducible example with which one would actually fully understand the context. It may well be that async operations here are not useful. However, it is IMHO a fallacy to think that just because processing is CPU bound, async I/O is "pointless". Async operations provide architectural benefits independent of possible performance benefits, and the lack of the latter does not preclude the possibility of the former.Unconscionable
@PeterDuniho What architectural benefits are there? You can always simulate any form of concurrency or parallelism using threads. The only point of async IO is being threadless (and in case of async IO plus await being awesome with GUI scenarios). The code quality detriments are significant, however.Plumule
Going to refrain from closing this. Somehow I think there is a good core to this question. Since it is novel material, as opposed to the 100 rote async questions per day ("Oh my app locked up because I called Result or Wait!"), I'll give this the benefit of the doubt. @PeterDunihoPlumule
"The only point of async IO is being threadless" -- I guess we'll have to agree to disagree. First, async I/O isn't even "threadless"; it just happens to use the IOCP thread pool instead of requiring additional explicitly-created threads. Second, the async idiom in C# provides a very good, clean way to implement asynchronous code in a virtually non-asynchronous form, which is useful regardless of any performance benefits. YMMV.Unconscionable
Actually, there is not even an IOCP thread. blog.stephencleary.com/2013/11/there-is-no-thread.html If there are 10m sockets open and being read, there are not 10m IOCP threads. Maybe a few dozen. I love await, though :)Plumule
There will be IO bound work here. The part which reads the file - either from Disk or using HttpClient and opens a stream - which I feed into StreamReader and JsonTextReader. - So yes, to answer your comments (@usr) - there will be IO work and also async/await provides a cleaner way to implement codeScheers
Though JSON.NET does not support async/await yet. Think of a scenario, whereby I don't have file on disk, but hosted on web server. So, I would have something like stream = await httpClient.GetStreamAsync(uri) and then pass that stream to StreamReader and in turn JsonTextReader. Also, the file would be really large, so instead of deserializing whole file at once, I would like to process it record by record in file. That way I would not hit OutOfMemoryExceptionScheers
@TejasVora, the _fileBufferBlock implementation feels a bit... dirty. All you're doing is dumping your file names into the BufferBlock, which does not have a capacity limit, incurring async/await overheads for zero benefit. You could reduce the number of moving parts and just post or send each of your filenames directly to your ActionBlock instead. That ActionBlock also has a BoundedCapacity, and so will throttle the producer for you, thereby managing the back-pressure (which may be important as you stated that your producer is much faster than consumer).Salts
You would like to introduce a caching or que in between when producer and consumer are running at different pace (usually producer is faster than consumer). Depending on your requirements, queue could be as simple as MSMQ to a high performance one like RabitMQ, Redis and so on.Arredondo
@Saleem, respectfully, TPL Dataflow has perfectly functional controls for synchronising producer and consumer blocks, so bringing in a monster like MSMQ into this is absolutely, totally unnecessary.Salts
@KirillShlenskiy You are correct. Instead of posting to BufferBlock<>, I can directly post to ActionBlock<> or TransformBlock<,> or IPropagateBlock<,> and set BoundedCapacity and MaxMessagesPerTask and MaxDegreeOfParallelism on that block. The code above is sort of sample code.Scheers
@TejasVora are there fragments of a file that could be processed in parallel, or it is strictly sequential?Ambassador
@Ambassador it's strictly sequential, as it contains JSON list. So I can't break it down and have different tasks process different parts.Scheers
ok, another question: How complex is to process a single DataType structure?Ambassador
@Ambassador It's a fairly complex structure. With lots of nested classes and arrays.Scheers
I should have clarified the question: after DataType instance is constructed is there still some expensive processing to be done?Ambassador
@Ambassador yes, need to push that data to various datastore/databaseScheers
@TejasVora: If it is acceptable to read all DataType instances for a file at the same time (into an array), then you can use TransformManyBlock. If it's not, then what you have is as good as dataflow can do (Rx can do more).Principle
@StephenCleary that's the problem. Cannot read everything from file into array or list. Tis will cause out of memory exception. So the nest choice I have is to use Ienumerable<> or IObservable<> and process the records from file.Scheers
@TejasVora: See my answer. Since the JSON readers force synchrony, that's the best you can do.Principle
S
10

Question 1

You plug an IEnumerable<T> producer into your TPL Dataflow chain by using Post or SendAsync directly on the consumer block, as follows:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}

You can also use a BufferBlock<TInput>, but in your case it actually seems rather unnecessary (or even harmful - see the next part).

Question 2

When would you prefer SendAsync instead of Post? If your producer runs faster than the URIs can be processed (and you have indicated this to be the case), and you choose to give your _processingBlock a BoundedCapacity, then when the block's internal buffer reaches the specified capacity, your SendAsync will "hang" until a buffer slot frees up, and your foreach loop will be throttled. This feedback mechanism creates back pressure and ensures that you don't run out of memory.

Question 3

You should definitely use the LinkTo method to link your blocks in most cases. Unfortunately yours is a corner case due to the interplay of IDisposable and very large (potentially) sequences. So your completion will flow automatically between the buffer and processing blocks (due to LinkTo), but after that - you need to propagate it manually. This is tricky, but doable.

I'll illustrate this with a "Hello World" example where the producer iterates over each character and the consumer (which is really slow) outputs each character to the Debug window.

Note: LinkTo is not present.

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);

    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);

        Debug.Print($"Yielded {c}");
    }
});

try
{
    producer.Post("Hello world");
    producer.Complete();

    await producer.Completion;
}
finally
{
    consumer.Complete();
}

// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);

This outputs:

Yielded H
H
Yielded e
e
Yielded l
l
Yielded l
l
Yielded o
o
Yielded  

Yielded w
w
Yielded o
o
Yielded r
r
Yielded l
l
Yielded d
d

As you can see from the output above, the producer is throttled and the handover buffer between the blocks never grows too large.

EDIT

You might find it cleaner to propagate completion via

producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);

... right after producer definition. This allows you to slightly reduce producer/consumer coupling - but at the end you still have to remember to observe Task.WhenAll(producer.Completion, consumer.Completion).

Salts answered 13/2, 2016 at 4:24 Comment(0)
P
8

In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<> with yield return and then further process the data.

I don't believe this step is necessary. What you're actually avoiding here is just a list of filenames. Even if you had millions of files, the list of filenames is just not going to take up a significant amount of memory.

I am linking input buffer to ActionBlock which in turn posts to another queue. However by using ActionBlock<>, I cannot link it to next block for processing and have to manually Post/SendAsync from ActionBlock<> to BufferBlock<>. Also, in this case, not sure, how to track completion.

ActionBlock<TInput> is an "end of the line" block. It only accepts input and does not produce any output. In your case, you don't want ActionBlock<TInput>; you want TransformManyBlock<TInput, TOutput>, which takes input, runs a function on it, and produces output (with any number of output items for each input item).

Another point to keep in mind is that all buffer blocks have an input buffer. So the extra BufferBlock is unnecessary.

Finally, if you're already in "dataflow land", it's usually best to end with a dataflow block that actually does something (e.g., ActionBlock instead of BufferBlock). In this case, you could use the BufferBlock as a bounded producer/consumer queue, where some other code is consuming the results. Personally, I would consider that it may be cleaner to rewrite the consuming code as the action of an ActionBlock, but it may also be cleaner to keep the consumer independent of the dataflow. For the code below, I left in the final bounded BufferBlock, but if you use this solution, consider changing that final block to a bounded ActionBlock instead.

private const int ProcessingSize= 4;
private static readonly HttpClient HttpClient = new HttpClient();
private TransformBlock<string, DataType> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
  PrepareDataflow(token);
  ListFiles(_fileBufferBlock, token);
  _processingBlock.Complete();
  return _processingBlock.Completion;
}

private void ListFiles(ITargetBlock<string> targetBlock, CancellationToken token)
{
  ... // Get list of file Uris, occasionally calling token.ThrowIfCancellationRequested()
  foreach(var fileNameUri in fileNameUris)
    _processingBlock.Post(fileNameUri);
}

private async Task<IEnumerable<DataType>> ProcessFileAsync(string fileNameUri, CancellationToken token)
{
  return Process(await HttpClient.GetStreamAsync(fileNameUri), token);
}

private IEnumerable<DataType> Process(Stream stream, CancellationToken token)
{
  using (stream)
  using (var sr = new StreamReader(stream))
  using (var jsonTextReader = new JsonTextReader(sr))
  {
    while (jsonTextReader.Read())
    {
      token.ThrowIfCancellationRequested();
      if (jsonTextReader.TokenType == JsonToken.StartObject)
      {
        try
        {
          yield _jsonSerializer.Deserialize<DataType>(jsonTextReader);
        }
        catch (Exception ex)
        {
          _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
        }
      }
    }
  }
}

private void PrepareDataflow(CancellationToken token)
{
  var executeOptions = new ExecutionDataflowBlockOptions
  {
    CancellationToken = token,
    MaxDegreeOfParallelism = ProcessingSize
  };
  _processingBlock = new TransformManyBlock<string, DataType>(fileName =>
      ProcessFileAsync(fileName, token), executeOptions);

  _messageBufferBlock = new BufferBlock<DataType>(new DataflowBlockOptions
  {
    CancellationToken = token,
    BoundedCapacity = 50000
  });
}

Alternatively, you could use Rx. Learning Rx can be pretty difficult though, especially for mixed asynchronous and parallel dataflow situations, which you have here.

As for your other questions:

How to use IEnumerable<> and yeild return with async/await and dataflow.

async and yield are not compatible at all. At least in today's language. In your situation, the JSON readers have to read from the stream synchronously anyway (they don't support asynchronous reading), so the actual stream processing is synchronous and can be used with yield. Doing the initial back-and-forth to get the stream itself can still be asynchronous and can be used with async. This is as good as we can get today, until the JSON readers support asynchronous reading and the language supports async yield. (Rx could do an "async yield" today, but the JSON reader still doesn't support async reading, so it won't help in this particular situation).

In this case, how to track completion.

If the JSON readers did support asynchronous reading, then the solution above would not be the best one. In that case, you would want to use a manual SendAsync call, and would need to link just the completion of these blocks, which can be done as such:

_processingBlock.Completion.ContinueWith(
    task =>
    {
      if (task.IsFaulted)
        ((IDataflowBlock)_messageBufferBlock).Fault(task.Exception);
      else if (!task.IsCanceled)
        _messageBufferBlock.Complete();
    },
    CancellationToken.None,
    TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
    TaskScheduler.Default);

Should I use LinkTo feature of datablocks to connect various blocks? or use method such as OutputAvailableAsync() and ReceiveAsync() to propagate data from one block to another.

Use LinkTo whenever you can. It handles all the corner cases for you.

// Should throw? // Should fault the block?

That's entirely up to you. By default, when any processing of any item fails, the block faults, and if you are propagating completion, the entire chain of blocks would fault.

Faulting blocks are rather drastic; they throw away any work in progress and refuse to continue processing. You have to build a new dataflow mesh if you want to retry.

If you prefer a "softer" error strategy, you can either catch the exceptions and do something like log them (which your code currently does), or you can change the nature of your dataflow block to pass along the exceptions as data items.

Principle answered 13/2, 2016 at 14:3 Comment(3)
Question: Why cannot I use SendAsync and have to use Post here?Scheers
@TejasVora: Let me turn that around. What benefit would SendAsync have over Post in this case?Principle
Nothing in particular. Just an informative question.Scheers
B
4

It would be worth looking at Rx. Unless I'm missing something your entire code that you need (apart from your existing ProcessFileAsync method) would look like this:

var query =
    fileNameUris
        .Select(fileNameUri =>
            Observable
                .FromAsync(ct => ProcessFileAsync(fileNameUri, ct)))
        .Merge(maxConcurrent : 4);

var subscription =
    query
        .Subscribe(
            u => { },
            () => { Console.WriteLine("Done."); });

Done. It's run asynchronously. It's cancellable by calling subscription.Dispose();. And you can specify the maximum parallelism.

Bureaucratic answered 13/2, 2016 at 5:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.