dart streams and await
Asked Answered
A

1

0

I'm having trouble understanding the flow of the following code. The code should process MERGE_SIZE lines (3 in this run), save the lines to a 'phase' file, and then process the next 3 lines and so on.

The call to savePhase has an await so I was expecting for the savePhase to complete before additional lines are processed.

As you can see in the output below every line is process and then the savePhase calls complete.

Future _sort() async {
    var completer = Completer<void>();
    var instance = 0;
    var lineCount = MERGE_SIZE;

    var phaseDirectory = Directory.systemTemp.createTempSync();

    var list = <String>[];

    var sentToPhase = false;

    await File(filename)
        .openRead()
        .map(utf8.decode)
        .transform(LineSplitter())
        .forEach((l) async {
      list.add(l);
      print('$l linecount:$lineCount');
      lineCount--;

      if (lineCount == 0) {
        lineCount = MERGE_SIZE;
        instance++;
        sentToPhase = true;
        await savePhase(phaseDirectory, 1, instance, list, lineDelimiter);
        list.clear();
        print('savePhase completed');
      }
    });

which outputs

9 line linecount:3
8 line linecount:2
7 line linecount:1
6 line linecount:3
5 line linecount:2
4 line linecount:1
3 line linecount:3
2 line linecount:2
1 line linecount:1
savePhase completed
savePhase completed
savePhase completed

Is this something to do with the streams that openRead uses to deliver the read lines?

I thought I had await figured out, but apparently not :)

Abide answered 31/12, 2019 at 23:23 Comment(0)
J
2

Not tested your program but I am fairly sure that your problem is that you expect the forEach() method are waiting for each Future to be completed before the next call which are not the case.

Try take a look at the following solution which are about more or less the same problem: Sequential processing of a variable number of async functions in Dart

Flow of program

So what happens in your code is that the file your are reading seems to be small enough that the whole content can be read in one go in one of the buffers used when reading a file. This will explain why you see multiple line linecount lines before savePhase completed.

As previous mentioned, the forEach() method on Stream does not take into account that the method given as parameter does return a Future which should be awaited. You can see that in the implementation showed here: https://api.dart.dev/stable/2.7.0/dart-async/Stream/forEach.html

So that means that the Future returned from calling forEach() does really just complete when all lines has been processed but does not wait for each Future generated for each line (remember, a async method does always return a Future regardless of it contains an await).

Since you also use shared variables between each spawned Future you will also get some funky behavior here since you e.g. share the same list but also clearing the list afterwards. So there are a potential here for errors here.

Janot answered 1/1, 2020 at 1:39 Comment(2)
So I know how to solve the problem (although the linked solution won't) I'm trying to understand exactly what is going on. The problem is that whilst the linked solution will sequence the calls I need to ensure that the main method doesn't return until all of the savePhase methods have completed. the linked solution is able to return as soon as the last expensive funcction is called. It doesn't wait for it to complete.Abide
Ok, I will update my answer with a description of the flow (which are some kind of a mess which was the reason I did not do it). :DJanot

© 2022 - 2024 — McMap. All rights reserved.