How do you combine 2 Riverpod StreamProviders where 1 stream depends on the data from another stream?
Asked Answered
R

2

7

Sometimes I think I'm getting the logic of providers and then I get stumped for hours trying to do something like below.

I need to get a List of connection id's from a firestore collectionsstream. Easy.

However, I need to feed this streaming list of connection id's into another firestore collectionstream. Below, you can see my upcomingEventsStreamProvider is ref.watch both the database and the connectionsStream. There are no errors thrown by riverpod or firestore. However, in the logs, I see my print statements in this order:

returned data
returned null stream value

How am I abusing the power of Riverpod providers? Lol.

final connectionsStreamProvider = StreamProvider<List<UidConnections>>((ref) {
  final database = ref.watch(databaseProvider);
  return database != null ? database.connectionsStream() : Stream.value(null);
});

final connectionsListStateProvider = StateProvider<List>((ref) => []);

final upcomingEventsStreamProvider = StreamProvider<List<SpecialEvents>>((ref) {
  final database = ref.watch(databaseProvider);
  final connectionsStream = ref.watch(connectionsStreamProvider);
  if (database != null && connectionsStream != null) {
    connectionsStream.whenData((data) {
      if (data != null) {
        data.forEach((event) {
          if (event?.active == true && event?.connectedUid != null) {
            ref
                .read(connectionsListStateProvider)
                .state
                .add(event.connectedUid);
          }
        });
        print('returned data');
        return database.upcomingSpecialEventsStream(
            ref.read(connectionsListStateProvider).state);
      }
    });
  }

  print('returned null stream value');
  return Stream.value(null);
});

Or maybe, do I just need to refactor my Firebase Cloud Firestore query to first obtain the connection id stream? I'd rather use Riverpod because I'll still need a stream of connection id's alone.

Rhythm answered 17/3, 2021 at 17:21 Comment(5)
I am unsure if this will solve your whole issue but you should add a return: return connectionsStream.whenData... as you are returning your database.upcomingSpecialEventsStream but then not returning it from whenData.Hugohugon
Unfortunately, tried that, but it would only return the AsyncValue.Rhythm
Right, but that should be an AsyncValue<Stream> which you can read with AsyncValue<Stream>.when(data, loading, err)Hugohugon
Return type is AsyncValue<Null> when adding return.Rhythm
I see @double-beep. Answer posted.Rhythm
R
10

Still stumped on how to accomplish combining 2 streams with Riverpod alone. However, I did manage to resolve my issue and here's info for anyone else that runs into this use case where a stream is dependent on data from another stream.

The solution that worked, and very well I might add, is to use rxdart Rx.combineLatest2. See below. Now Riverpod is happy, and providing state for my combined streams. The connection id's I needed are now part of my Account model thanks to rxdart. Hope this is helpful to someone out there.

final accountInfoStreamProvider = StreamProvider<Account>((ref) {
  final database = ref.watch(databaseProvider);
  return database != null
      ? AccountInfoModel(database: database).accountInfoStream()
      : Stream.value(null);
});

class AccountInfoModel {
  AccountInfoModel({@required this.database});
  final FirestoreDatabase database;

  Stream<Account> accountInfoStream() {
    return Rx.combineLatest2(
      database.accountStream(),
      database.connectionsStream(),
      (Account account, List<Connections> connections) {
        connections.forEach((connection) {
          account.connections.add(connection.documentId);
        });
        return account;
      },
    );
  }
}
Rhythm answered 18/3, 2021 at 18:31 Comment(0)
P
0

Not sure this is what you are looking for, but here is one exemple that works:

Stream<int> periodicStream() {
  return Stream.periodic(const Duration(seconds: 1), (i) {
    return i;
  });
}

Stream<int> simpleStream(int i) async* {
    yield i;
}

final streamProvider1 = StreamProvider((ref) => periodicStream());
final streamProvider2 = StreamProvider.family((ref, int i) => simpleStream(i));

final streamProvider3 = StreamProvider<int>((ref) async* {
  final value1 = await ref.watch(streamProvider1.future);
  final value2 = await ref.watch(streamProvider2(value1).future);
  yield value2;
});

You would listen to streamProvider3, with ref.watch().

Pharmacology answered 29/10, 2023 at 19:44 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.