Dart: how to append a transformer to an existing stream?
Asked Answered
S

3

6

I'm looking a for a way to programmatically add a transformer to an existing stream that's already being listen to.

Example:

Stream numbers = new Stream.fromIterable([0,1,2,3]);

numbers.listen((number) => print(number));

Now in response to some UI event, I'd like to modify this stream by adding a mapping transformer, as if I originally wrote:

numbers.where((number) => number % 2 == 0);

All existing listeners should from now own only receive even numbers, without interruption. How can this be done?

Sacramentalist answered 18/9, 2014 at 0:37 Comment(0)
B
12

Instead of thinking about it like "how do I dynamically insert a transformer into a stream", one possible way is to think about it like "how do I dynamically control a transformer that I already injected".

Here's an example of using a StreamTransformer:

var onlySendEvenNumbers = false; // controlled by some UI event handler

var originalStream = makeStreamOfStuff();

originalStream = originalStream.transform(new StreamTransformer.fromHandlers(
  handleData: (int value, EventSink<int> sink) {
    if (onlySendEvenNumber) {
      if (value.isEven) {
        sink.add(value);
      }
    } else {
      sink.add(value);
    }
}));

originalStream.listen(print);  // listen on events like normal
Begorra answered 18/9, 2014 at 2:8 Comment(2)
Don't you need to assign the value returned by transform to originalStream? Or does transform transform the Stream in-place?Grew
I agree. You can't inject code between a stream subscription and its listener. The subscription (which is the active part of the stream, like the iterator of an iterable) sends its values directly to the listener, and it's out of your hands. You need to add the ability to filter before allowing someone to listen, and then you can dynamically change what the filter does - because it's your filter, not the listener's. It's really like a "man-in-the-middle" attack on the stream communication - you have to be there from the start.Tomkins
G
2

One way I can think of doing that is filtering the Stream with a function that calls another function:

var filter = (n) => true;
Stream numbers = new String.fromIterable([0, 1, 2, 3]).where((n) => filter(n));

Then, when you want to change the filtering:

filter = (n) => n % 2 == 0;

A concrete example:

import 'dart:async';

main() {
  var filter = (n) => true;

  Stream numbers = new Stream.periodic(new Duration(seconds: 1), (n) => n)
      .where((n) => filter(n));

  numbers.listen((n) => print(n));

  new Future.delayed(new Duration(seconds: 4)).then((_) {
    filter = (n) => n % 2 == 0;
  });
}

This will print:

0
1
2
3
4
6
8
10
12

And so on, for even numbers only, after 4 seconds.

Grew answered 18/9, 2014 at 1:15 Comment(1)
That is what I currently use, a place holder function passed, which itself has a set of predicates (the same could be done with mappers). This works, but seems inelegant, and doesn't compose. Suppose I wanted to add an intermediate filter, I'd have to anticipate it and have a place holder inserted at code writing time.Sacramentalist
S
0

What about rxdart's combineLatest2 ?

It combine two streams, and emit each time when changed both streams.

You can use Switch class for switch on/off with conditions.

class XsBloc {
  Api _api = Api();
  BehaviorSubject<List<X>> _xs = BehaviorSubject();
  BehaviorSubject<Switcher> _switcher =
      BehaviorSubject<Switcher>.seeded(Switcher(false, []));

  XsBloc() {
    Observable.combineLatest2<List<X>, Switcher, List<X>>(
        _api.xs(), _switcher, (xs, s) {
      if (s.isOn == true) {
        return xs.where((x) => s.conditions.contains(x.id)).toList();
      } else {
        return xs;
      }
    }).listen((x) => _xs.add(x));
  }
  Stream<List<X>> get xs => _xs;

  ValueObservable<Switcher> get switcher =>
      _switcher.stream;

  Function(Switcher) get setSwitcher => _switcher.sink.add;

}

class Switcher {
  final bool isOn;
  final List<String> conditions;
  Switcher(this.isOn, this.conditions);
}

var bloc = XsBloc();

bloc.setSwitcher(true, ['A', 'B']);
bloc.setSwitcher(false, []);
bloc.setSwitcher(true, []);
Swellfish answered 27/11, 2019 at 14:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.