rx dart combine multiple streams to emit value whenever any of the streams emit a value
Asked Answered
M

4

6

In RX dart there is the RX.combineLatest method to combine the results of a stream using a callback function. Problem is that it only emits a value when every stream has emitted a value. If one has not it does not emit.

Merges the given Streams into a single Stream sequence by using the combiner function whenever any of the stream sequences emits an item. The Stream will not emit until all streams have emitted at least one item.

Im trying to combine multiple streams into one stream for validation which should emit false or true when the streams have not emitted or emitted an empty value.

class FormBloc {
  final BehaviorSubject<bool> _result = BehaviorSubject();
  final BehaviorSubject<String?> _usernameController = BehaviorSubject();
  final BehaviorSubject<String?> _emailController = BehaviorSubject();

  // Will only emit if each stream emitted a value
  // If only username is emitted valid is not emitted
  Stream<bool> get valid$ => Rx.combineLatest2(
    _usernameController.stream, 
    _emailController.stream, 
    (username, email) => username != null || email != null
  );

}

How can I join those streams so valid$ emits a value if any of the streams change?

Manon answered 15/12, 2021 at 9:46 Comment(0)
M
3

Because all of the solutions here are workarounds Ive implemented my own stream class. Implementation equals the original CombineLatestStream implementation except that it does not wait for all streams to emit before emitting:


import 'dart:async';

import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/subscription.dart';

class CombineAnyLatestStream<T, R> extends StreamView<R> {

  CombineAnyLatestStream(List<Stream<T>> streams, R Function(List<T?>) combiner) : super(_buildController(streams, combiner).stream);

  static StreamController<R> _buildController<T, R>(
    Iterable<Stream<T>> streams,
    R Function(List<T?> values) combiner,
  ) {

    int completed = 0;

    late List<StreamSubscription<T>> subscriptions;
    List<T?>? values;

    final _controller = StreamController<R>(sync: true);

    _controller.onListen = () {

      void onDone() {
        if (++completed == streams.length) {
          _controller.close();
        }
      }

      subscriptions = streams.mapIndexed((index, stream) {

        return stream.listen(
          (T event) {
            final R combined;

            if (values == null) return;

            values![index] = event;

            try {
              combined = combiner(List<T?>.unmodifiable(values!));
            } catch (e, s) {
              _controller.addError(e, s);
              return;
            }

            _controller.add(combined);
          },
          onError: _controller.addError,
          onDone: onDone
        );
      }).toList(growable: false);

      if (subscriptions.isEmpty) {
        _controller.close();
      } else {
        values = List<T?>.filled(subscriptions.length, null);
      }
    };

    _controller.onPause = () => subscriptions.pauseAll();
    _controller.onResume = () => subscriptions.resumeAll();
    _controller.onCancel = () {
      values = null;
      return subscriptions.cancelAll();
    };

    return _controller;
  }

}
Manon answered 15/3, 2022 at 12:48 Comment(0)
D
0

Creating new stream which emits current value and listen the stream is my best practice.

class FormBloc {
  final BehaviorSubject<bool> _result = BehaviorSubject();
  final BehaviorSubject<String?> _usernameController = BehaviorSubject();
  final BehaviorSubject<String?> _emailController = BehaviorSubject();

  final _usernameStreamController = StreamController<String?>()
    ..add(_usernameController.value)
    ..addStream(_usernameController.stream);
  final _emailStreamController = StreamController<String?>()
    ..add(_emailController.value)
    ..addStream(_emailController.stream);

  Stream<bool> get valid$ => Rx.combineLatest2(
    _usernameStreamController.stream,  // use streamController instead
    _emailStreamController.stream,  // use streamController instead
    (username, email) => username != null || email != null
  );
}
Diligent answered 12/2, 2022 at 5:59 Comment(0)
N
0

Instead of combining multiple streams, you can use one BehaviorSubject<Map<String, String?>> to emit changes in username or email.

add either changed\submitted username or email to the BehaviorSubject

_usernameEmailController.add({"uname": value},);

or

_usernameEmailController.add({"email": value},);

so that you can validate the inputs by listening to it. I used StreamBuilder to display the emitted values,

            StreamBuilder<Map<String, String?>>(
              stream: _usernameEmailController.stream
                  .map((data) {
                    _r = {..._r, ...data};
                    return _r;
                  }),
              builder: (context, snapshot) {
                return Column(
                  children: [
                    Text(snapshot.data.toString()),
                    if (snapshot.hasData)
                      Text(
                          "Is valid?: "
                          "${(snapshot.data!["uname"] != null && snapshot.data!["uname"]!.isNotEmpty) || (snapshot.data!["email"] != null && snapshot.data!["email"]!.isNotEmpty)}"
                      ),
                  ],
                );
              },
            ),

checkout the my solution on DartPad here.

In the DartPad I have used StreamController instead of BehaviorSubject as DartPad doesn't support rxdart Package. But you can replace line 40 in DartPad

final StreamController<Map<String, String?>> _usernameEmailController =
        StreamController();

with

final BehaviorSubject<Map<String, String?>> _usernameEmailController =
        BehaviorSubject();

If you want to use BehaviorSubject.

Nix answered 4/3, 2022 at 7:4 Comment(0)
C
0

You could seed your BehaviorSubjects with a default value of null:

  final BehaviorSubject<String?> _usernameController = BehaviorSubject().seeded(null);
  final BehaviorSubject<String?> _emailController = BehaviorSubject().seeded(null);

Another possibility would be to give the combined stream a seed value:

Rx.combineLatest2(
    _usernameController.stream, 
    _emailController.stream, 
    (username, email) => username != null || email != null
  ).sharedValueSeeded(false);
Cosmo answered 7/3, 2022 at 15:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.