Can exceptions thrown in dart streams be handled by subscribers without closing the stream?
Asked Answered
S

2

12

Short example of what I'm having trouble understanding:

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

With usage:

getNumbersWithException()
    .handleError((x) => print('Exception caught for $x'))
    .listen((event) {
  print('Observed: $event');
});

This will stop at 3 with the output:

Observed: 0
Observed: 1
Observed: 2
Observed: 3
Exception caught for Exception: foo

From the documentation (https://dart.dev/tutorials/language/streams) and (https://api.dart.dev/stable/2.9.1/dart-async/Stream/handleError.html), this is as expected, as exceptions thrown will automatically close the stream.

  1. Does this mean that the correct way to handle exceptions in a stream, so that subscriptions can be long-lived in such an event, is to handle the exception inside the stream itself? That it is not possible to do so from the outside?
  2. Is this the same for broadcast streams?
  3. If I'm thinking about this in the wrong way, what are some pointers to start thinking right?

I'm currently thinking of streams as being a source of asynchronous data events that occasionally might be error events. From the documentation and examples, it all looks neat, but I'm thinking that wanting to handle errors and otherwise continue observing the data stream is a normal use case. I'm having a hard time writing the code to do so. But, I might be going about this wrong. Any insights will be much appreciated.


Edit: I can add that I've tried various things like using a stream transformer, with the same result:

var transformer = StreamTransformer<int, dynamic>.fromHandlers(
  handleData: (data, sink) => sink.add(data),
  handleError: (error, stackTrace, sink) =>
      print('Exception caught for $error'),
  handleDone: (sink) => sink.close(),
);
getNumbersWithException().transform(transformer).listen((data) {
  print('Observed: $data');
});

Also, listen() has an optional argument cancelOnError that looks promising, but it defaults to false, so no cigar here.

Stereobate answered 15/8, 2020 at 10:54 Comment(1)
It is somewhat understandable that this might not be possible. As looking at the code where the exception is thrown, what would it mean for the stream to continue? It would need to be thrown in a way that was meant for it to continue, and exceptions typically do not work this way. Which begs the question, is there such a way to do this? To create an error event, yet be able to proceed?Stereobate
P
13

The generator method

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

will terminate when you throw an exception. The throw works normally, it doesn't directly add the exception to the stream. So, it propagates out through the loop and the method body, until the entire method body ends with the thrown exception. At that point the unhandled exception is added to the stream, and then the stream is closed because the body has ended.

So, the problem is not with the handling, but with the generation of the stream. You must indeed handle the error locally to avoid it ending the stream generating body.

You can't add more than one error to a stream using throw in an async* method, and the error will be the last thing that stream does.

The availabe hack to actually emit more than one error is to yield the exception:

  if (i == 3) yield* () async* { throw Exception(); }();
  // or:      yield* Stream.fromFuture(Future.error(Exception());
  // or:      yield* Stream.error(Exception()); // Since Dart 2.5

That will emit an exception directly into the generated stream without throwing it locally and ending the generator method body.

Preshrunk answered 16/8, 2020 at 11:6 Comment(3)
Thank you! This was the missing piece of the puzzle. I'm finding it difficult to find this in the documentation. The general introduction on streams only alludes to it with "In this document we only discuss streams that deliver at most one error". This does indeed solve the use case I had in mind, where error and data events could be handled interdependently. One minor observation is that you mention "and the error will be the last thing that stream does.", perhaps you could clarify? When testing, these exceptions are picked up immediately by a handleError() before the stream closes.Stereobate
Apologies. I misread. What you wrote makes complete sense. Thanks again for the clarifications, I'll follow SO custom and leave it unaccepted for a few days.Stereobate
To add to the observations: The listen's argument cancelOnError that defaults to false, now also makes more sense, as this does make a difference when yielding the exceptions. The listener can now chose whether or not an exception should cancel the subscription. That seems more flexible.Stereobate
X
9

Use yield* Stream.error() instead of throwing the error.

Example:

Stream<int> getStream() async* {
  for (var i = 0; i < 5; i++) {
    yield i;
    if (i == 2 || i == 3) {
      yield* Stream.error('Custom error at index $i');
    }
  }
}

void main(List<String> arguments) {
  var stream = getStream();
  stream.listen((event) => print('Data: $event'),
      onDone: () => print('Done'), onError: (err) => print('Error: $err'));
}

Output:

Data: 0
Data: 1
Data: 2
Error: Custom error at index 2
Data: 3
Error: Custom error at index 3
Data: 4
Done
Xenophon answered 10/3, 2023 at 16:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.