Combine streams from Firestore in flutter
Asked Answered
J

5

21

I have been trying to listen to more than one collection from Firestone using a StreamBuilder or something similar. My original code when I was working with only one Stream was:

import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:cloud_firestore/cloud_firestore.dart';

class List extends StatefulWidget{

  ///The reference to the collection is like
  ///Firestore.instance.collection("users").document(firebaseUser.uid).collection("list1").reference()
  final CollectionReference listReference;

  List(this.listReference);

  @override
  State createState() => new ListState();
}

class ListState extends State<List> {

  @override
  Widget build(BuildContext context){

    return new StreamBuilder(
        stream: widget.listReference.snapshots(),
        builder: (context, snapshot) {
          return new ListView.builder(
              itemCount: snapshot.data.documents.length,
              padding: const EdgeInsets.only(top: 2.0),
              itemExtent: 130.0,
              itemBuilder: (context, index) {
                DocumentSnapshot ds = snapshot.data.documents[index];
                return new Data(ds);
              }
          );
        });
  }
}

This code works fine, but now I want to listen to more than one collection. I have come across a solution that doesn't involve a StreamBuilder and works with a dynamic list. My code now looks like this:

import 'dart:async';
import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:cloud_firestore/cloud_firestore.dart';
import 'main.dart';
import 'package:async/async.dart';

class ListHandler extends StatefulWidget{

  final CollectionReference listReference;

  ListHandler(this.listReference);

  @override
  State createState() => new ListHandlerState();
}

class ListHandlerState extends State<ListHandler> {

  StreamController streamController;
  List<dynamic> dataList = [];

  @override
  void initState() {
    streamController = StreamController.broadcast();
    setupData();
    super.initState();
  }

  @override
  void dispose() {
    super.dispose();
    streamController?.close();
    streamController = null;
  }

  Future<Stream> getData() async{
      Stream stream1 = Firestore.instance.collection("users").document(firebaseUser.uid).collection("list1").snapshots();
      Stream stream2 = Firestore.instance.collection("users").document(firebaseUser.uid).collection("list2").snapshots();

      return StreamZip(([stream1, stream2])).asBroadcastStream();
  }

  setupData() async {
    Stream stream = await getData()..asBroadcastStream();
    stream.listen((snapshot) {
      setState(() {
        //Empty the list to avoid repetitions when the users updates the 
        //data in the snapshot
        dataList =[];
        List<DocumentSnapshot> list;
        for(int i=0; i < snapshot.length; i++){
          list = snapshot[i].documents;
          for (var item in list){
            dataList.add(item);
          }
        }
      });
    });
  }

  @override
  Widget build(BuildContext context){
    if(dataList.length == 0){
      return new Text("No data found");
    }

    return new ListView.builder(
        itemCount: dataList.length,
        padding: const EdgeInsets.only(top: 2.0),
        itemBuilder: (context, index) {
          DocumentSnapshot ds = dataList[index];
          return new Data(ds['title']);
        }
    );
  }
}

The thing is that the ListView returns Data that is a StatefulWidget and the user can interact with it making the data change in Firestore making the next error appear:

[VERBOSE-2:dart_error.cc(16)] Unhandled exception:
setState() called after dispose(): ListHandlerState#81967(lifecycle state: defunct, not mounted)
This error happens if you call setState() on a State object for a widget that no longer appears in the widget tree (e.g., whose parent widget no longer includes the widget in its build). This error can occur when code calls setState() from a timer or an animation callback. The preferred solution is to cancel the timer or stop listening to the animation in the dispose() callback. Another solution is to check the "mounted" property of this object before calling setState() to ensure the object is still in the tree.
This error might indicate a memory leak if setState() is being called because another object is retaining a reference to this State object after it has been removed from the tree. To avoid memory leaks, consider breaking the reference to this object during dispose().

The app does not crash, and it does what is expected but it always shows this error.

Some people use the library rxdart to work with streams and I have tried doing something like the code below but when I put it in the StreamBuilder only elements from on of the :

import 'dart:async';
import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:cloud_firestore/cloud_firestore.dart';
import 'main.dart';
import 'showInfo.dart';
import 'package:rxdart/rxdart.dart';

class ListHandler extends StatefulWidget{

  @override
  State createState() => new ListHandlerState();
}

class ListHandlerState extends State<ListHandler> {

  Stream getData() {
    Stream stream1 = Firestore.instance.collection("users").document(firebaseUser.uid).collection("list1").snapshots();
    Stream stream2 = Firestore.instance.collection("users").document(firebaseUser.uid).collection("list2").snapshots();

    return Observable.merge(([stream2, stream1]));
  }

  @override
  Widget build(BuildContext context){
    return new StreamBuilder(
        stream: getData(),
        builder: (context, snapshot) {
          if(!snapshot.hasData){
            print(snapshot);
            return new Text("loading");
          }
          return new ListView.builder(
              itemCount: snapshot.data.documents.length,
              padding: const EdgeInsets.only(top: 2.0),
              itemBuilder: (context, index) {
                DocumentSnapshot ds = snapshot.data.documents[index];
                return new Data(ds);
              }
          );
        });
  }
}

This is my first time working with Streams and I don't understand them quite well and I would like your thoughts on what to do.

Jamila answered 6/7, 2018 at 16:13 Comment(3)
Do you want to merge the two streams or do you only want to re-render when one of the streams emits a new value?Brunk
I want that my two streams act like one but getting information from two or more collections of Firebase (I asume I want to merge the two streams).Jamila
@RogerBoschMateo Did you find any solution to merge 2 streams into 1 stream. Kindly share. Thanks.Format
S
4

The problem is not in the merging, but in the StreamBuilder updating the UI based on the LATEST snapshot, in other words it doesn't stack snapshots it just picks up that last emitted an event, in other words the streams are merged and the merged stream does contain the data of all merged streams, however the streamBuilder will only show the very Last stream emitted event, a work around is this:

StreamBuilder<List<QuerySnapshot>>(stream: streamGroup, builder: (BuildContext context, 
    AsyncSnapshot<List<QuerySnapshot>> snapshotList){
                  if(!snapshotList.hasData){
                    return MyLoadingWidget();
                  }
                  // note that snapshotList.data is the actual list of querysnapshots, snapshotList alone is just an AsyncSnapshot

                  int lengthOfDocs=0;
                  int querySnapShotCounter = 0;
                  snapshotList.data.forEach((snap){lengthOfDocs = lengthOfDocs + snap.documents.length;});
                  int counter = 0;
                  return ListView.builder(
                    itemCount: lengthOfDocs,
                    itemBuilder: (_,int index){
                      try{DocumentSnapshot doc = snapshotList.data[querySnapShotCounter].documents[counter];
                      counter = counter + 1 ;
                       return new Container(child: Text(doc.data["name"]));
                      }
                      catch(RangeError){
                        querySnapShotCounter = querySnapShotCounter+1;
                        counter = 0;
                        DocumentSnapshot doc = snapshotList.data[querySnapShotCounter].documents[counter];
                        counter = counter + 1 ;
                         return new Container(child: Text(doc.data["name"]));
                      }

                    },
                  );
                },
Sayer answered 18/10, 2018 at 13:28 Comment(2)
Thank you! I can't test the solution as I'm no longer working in that personal project, but at the end I came to the same conclusion as you said: the streamBuilder will only show the very Last stream emitted event.Jamila
where is this streamGroup coming from?Merriweather
C
26

Use CombineLatestStream from rxdart to combine streams.

The StreamBuilder will build every time one of the streams emits a new event.

The result snapshot.data is a list of the latest element per stream.

Example:

StreamBuilder(
stream: CombineLatestStream.list([
  stream0,
  stream1,
]),
builder: (context, snapshot) {
  final data0 = snapshot.data[0];
  final data1 = snapshot.data[1];
})
Chape answered 4/5, 2020 at 16:32 Comment(6)
This is the actual working answer. Upvoted 👍Transpolar
@erlend Erlend - I have used it but getting error "type 'CombineLatestStream<QuerySnapshot<Map<String, dynamic>>, List<QuerySnapshot<Map<String, dynamic>>>>' is not a subtype of type 'Stream<QuerySnapshot<Object?>>?'" Kindly suggest, how to fix it. Thanks.Format
same as I. did you fix it @erlendMerriweather
Getting the error: The operator '[]' isn't defined for the type 'Object'. Try defining the operator '[]' Which is weird because when I print out snapshot.data I get [Instance of '_JsonQuerySnapshot', Instance of '_JsonQuerySnapshot'] Thoughts?Belicia
I think you need to define in either StreamBuilder or CombineLatestStream what type of data you'd expect, or cast the data to the correct type.Chape
Worked for me using Firebase RTDB, trying to access multiple streams and their dataMaritzamariupol
S
8

The last example should work. Perhaps the 2nd stream didn't emit any values during the time you observed the stream.

StreamGroup.merge() from the async package should also work.

StreamZip creates pairs of values one of each stream. When they emit values at a different rate, then one stream waits with emitting until the other emits a value. This is probably not what you want.

Snowbird answered 6/7, 2018 at 16:23 Comment(6)
I edited a little bit the function getData(). What I get returning Observable.merge(([stream1, stream2])) is all the data from list2, but if I return Observable.merge(([stream2, stream1])) I get the data from list1 in the StreamBuilder.Jamila
If I use StreamGroup.merge(([stream2, stream1])) the same thing happensJamila
That doesn't sound kike caused by the merging of the streams.Brunk
Do you have some working code or some other tips? I can’t get it to workJamila
I have the same issue as @RogerBoschMateo describes. Did you figure it out?Dion
Key phrase in the answer - "during the time you observed the stream". If these are broadcast streams and you merge them you'll still only see the events emitted during the time you are listening.Beery
S
4

The problem is not in the merging, but in the StreamBuilder updating the UI based on the LATEST snapshot, in other words it doesn't stack snapshots it just picks up that last emitted an event, in other words the streams are merged and the merged stream does contain the data of all merged streams, however the streamBuilder will only show the very Last stream emitted event, a work around is this:

StreamBuilder<List<QuerySnapshot>>(stream: streamGroup, builder: (BuildContext context, 
    AsyncSnapshot<List<QuerySnapshot>> snapshotList){
                  if(!snapshotList.hasData){
                    return MyLoadingWidget();
                  }
                  // note that snapshotList.data is the actual list of querysnapshots, snapshotList alone is just an AsyncSnapshot

                  int lengthOfDocs=0;
                  int querySnapShotCounter = 0;
                  snapshotList.data.forEach((snap){lengthOfDocs = lengthOfDocs + snap.documents.length;});
                  int counter = 0;
                  return ListView.builder(
                    itemCount: lengthOfDocs,
                    itemBuilder: (_,int index){
                      try{DocumentSnapshot doc = snapshotList.data[querySnapShotCounter].documents[counter];
                      counter = counter + 1 ;
                       return new Container(child: Text(doc.data["name"]));
                      }
                      catch(RangeError){
                        querySnapShotCounter = querySnapShotCounter+1;
                        counter = 0;
                        DocumentSnapshot doc = snapshotList.data[querySnapShotCounter].documents[counter];
                        counter = counter + 1 ;
                         return new Container(child: Text(doc.data["name"]));
                      }

                    },
                  );
                },
Sayer answered 18/10, 2018 at 13:28 Comment(2)
Thank you! I can't test the solution as I'm no longer working in that personal project, but at the end I came to the same conclusion as you said: the streamBuilder will only show the very Last stream emitted event.Jamila
where is this streamGroup coming from?Merriweather
Z
2

If you don't want a third party package, this logic is not too complex.

This method will combine N number of List streams into one list with the latest values.

Stream<List<T>> combineListStreams<T>(List<Stream<List<T>>> streams) {
  var controller = StreamController<List<T>>();
  Set activeStreams = {};
  Map<Stream<List<T>> , List<T>> lastValues = {};
  List<StreamSubscription> subscriptions = [];

  for (var stream in streams) {
    activeStreams.add(stream);
    var subscription = stream.listen(
      (val) {
        lastValues[stream] = val;
        List<T> out = [];
        for (var list in lastValues.values) {
          out.addAll(list);
        }
        controller.add(out);
      },
      onDone: () {
        activeStreams.remove(stream);
        if (activeStreams.isEmpty) {
          controller.close();
        }
      }
    );
    subscriptions.add(subscription);
  }
  controller.onCancel = () {
    for (var subscription in subscriptions) {
      subscription.cancel();
    }
  };
  return controller.stream;
}
Zebra answered 26/1, 2022 at 22:21 Comment(3)
Any way you could make this work for a Firestore stream? Each stream is a Stream<QuerySnapshot<Map<String, dynamic>>>. I tried changing some stuff up but couldn't get it to work.Belicia
It works for me, thks!Betony
Updated this code to fix leaked stream resources.Zebra
I
1

You might want to try concatWith from the rxDart package:

Returns a Stream that emits all items from the current Stream, then emits all items from the given streams, one after the next.

https://pub.dev/packages/rxdart

import 'package:rxdart/rxdart.dart';

      Stream.fromIterable(['a', 'b', 'c']).concatWith([
        Stream.fromIterable(['d', 'e', 'f'])
      ]).listen(print);

This will print:

a

b

c

d

e

f

Interested answered 11/5, 2020 at 16:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.