Dart yield stream events from another stream listener
Asked Answered
B

2

5

I have a function that generates stream of specific events. Now I have a stream coming from storage service which has its own events. Looking for a way to yield my events when something changes in the storage stream.

This code snippet doesn't do the trick.

Stream<BlocState> mapEventToState(
    BlocEvent event,
  ) async* {
  if (event is UploadData) {
    yield UploadDataProgress(progress: 0.0);
    final Storage storage = Storage();
    final Stream<StorageEvent> upload = storage.upload(event.data);

    upload.listen((StorageEvent storageEvent) async* {
      print('***** Listener: ${storageEvent.type} - ${storageEvent.progress}');

      if (storageEvent.type == StorageEventType.error) {
        yield UploadDataError(errorMessage: storageEvent.error);
      }

      if (storageEvent.type == StorageEventType.success) {
        yield UploadDataSuccess();
      }

      if (storageEvent.type == StorageEventType.progress) {
        yield UploadDataProgress(progress: storageEvent.progress);
      }
    });
  }
}

Output: The debug print works but the events are not sent to listeners.

***** Listener: StorageEventType.progress - 0.01924033836457124
***** Listener: StorageEventType.progress - 0.044581091468101464
***** Listener: StorageEventType.progress - 0.6986233206170177
***** Listener: StorageEventType.progress - 1.0
Betz answered 8/5, 2019 at 8:26 Comment(1)
Possible duplicate of Dart/Flutter - "yield" inside a callback function – Konstantin
U
23

Your yields are yielding from the anonymous function (StorageEvent storageEvent) async* { rather than from mapEventToState.

Simply replacing the listen() with an await for should work.

Stream<BlocState> mapEventToState(
    BlocEvent event,
  ) async* {
  if (event is UploadData) {
    yield UploadDataProgress(progress: 0.0);
    final Storage storage = Storage();
    final Stream<StorageEvent> upload = storage.upload(event.data);

    await for (StorageEvent storageEvent in upload) {
      print('***** Listener: ${storageEvent.type} - ${storageEvent.progress}');

      if (storageEvent.type == StorageEventType.error) {
        yield UploadDataError(errorMessage: storageEvent.error);
      }

      if (storageEvent.type == StorageEventType.success) {
        yield UploadDataSuccess();
      }

      if (storageEvent.type == StorageEventType.progress) {
        yield UploadDataProgress(progress: storageEvent.progress);
      }
    }
  }
}
Undulation answered 8/5, 2019 at 9:38 Comment(2)
if we use Stream<double> instead of a a BlocState(let's say for upload progress), is it possible to catch 'done' with await for? (listen(onDone: (here)) – Konstantin
Yes πŸ‘‹πŸ‘‹ dart.dev/tutorials/language/streams#receiving-stream-events – Undulation
S
8

For what it's worth. I had a similar problem, where I was subscribing to Firebase snapshots and trying to yield events based on the document data.

If I used the await-for paradigm, there was no subscription handle I would get. As a result, it became painful to stop reading the stream when I wanted to close the BLoC.

I found a round-about solution to this.

Stream<BlocState> mapEventToState(BlocEvent event) async* {
  if (event is FetchEvent) {
    yield LoadingState();
    _subscription = SomeStream(event.someKey).listen((snapshot) {
      add(OnSnapshotEvent(snapshot));
    });
  }

  if (event is OnSnapshotEvent) {
    if (SomeCondition(event.snapshot)) {
      yield SomeResultState();
    } else {
      yield SomeOtherState();
    }
  }
}

This way, I have a handle to the subscription, so I can always nicely cleanup.

Smoke answered 12/7, 2020 at 2:46 Comment(0)

© 2022 - 2024 β€” McMap. All rights reserved.