How to create a StreamTransformer in Dart?
Asked Answered
P

5

26

Trying to build a custom StreamTransformer class, however a lot of the examples out there seem to be out of date, and the one found in the documentation isn't (what some typed languages might consider anyway) as a class (found here: https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer). This doesn't seem like a very Dart-like way of approaching it and rather more of a Javascript-like way (which I'm using Dart to avoid).

Many online sources say this is how you create a StreamTransformer, however there errors when extending it.

class exampleStreamTransformer extends StreamTransformer
{
  //... (This won't work)
}

'Implements' seems to be the way to go, along with implementing the bind function needed:

class exampleStreamTransformer implements StreamTransformer
{
  Stream bind(Stream stream)
  {
    //... (Go on to return new stream, etc)
  }
}

I can't seem to find any examples of this way, but have thrown something together myself (which is accepted in my IDE, but isn't accepted at runtime, I get a null object error when it tries to use pause getter):

class exampleStreamTransformer implements StreamTransformer
{
  StreamController<String> _controller;
  StreamSubscription<String> _subscription;

  Stream bind(Stream stream)
  {
    _controller = new StreamController<String>(
        onListen: ()
        {
          _subscription = stream.listen((data)
          {
            // Transform the data.
            _controller.add(data);
          },
          onError: _controller.addError,
          onDone: _controller.close,
          cancelOnError: true); // Unsure how I'd pass this in?????
        },
        onPause: _subscription.pause,
        onResume: _subscription.resume,
        onCancel: _subscription.cancel,
        sync: true
    );

    return _controller.stream;
  }
}

Would like to achieve it this way, as in the 'typed' way of producing the class, any help is much appreciated, thank you.

Presidentship answered 4/1, 2015 at 13:15 Comment(1)
I agree that StreamTransformer is an unnecessary class - it contains only one function, so you could just pass that function anywhere you pass the object. If it was added to the library now, it would just be a function type.Thomey
D
13

Okay. Here's another working example:

import 'dart:async';

class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOnError;

  // Original Stream
  Stream<S> _stream;

  DuplicateTransformer({bool sync: false, this.cancelOnError}) {
    _controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
      _subscription.pause();
    }, onResume: () {
      _subscription.resume();
    }, sync: sync);
  }

  DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
    _controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    _subscription = _stream.listen(onData,
      onError: _controller.addError,
      onDone: _controller.close,
      cancelOnError: cancelOnError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  /**
   * Transformation
   */

  void onData(S data) {
    _controller.add(data);
    _controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
  }

  /**
   * Bind
   */

  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }
}

void main() {
  // Create StreamController
  StreamController controller = new StreamController.broadcast();
  // Transform
  Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());

  s.listen((data) {
    print('data: $data');
  }).cancel();

  s.listen((data) {
    print('data2: $data');
  }).cancel();

  s.listen((data) {
    print('data3: $data');
  });

  // Simulate data

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

Let me add some notes:

  • Using implements seems to be the right way here when looking at the source code of other dart internal transformers.
  • I implemented both versions for regular and a broadcast stream.
  • In case of a regular stream you can call cancel/pause/resumt directly on the new stream controller because we can only listen once.
  • If you use a broadcast stream I found out that listen() is only called if there is no one listening already to the stream. onCancel behaves the same. If the last subscriber cancels its subscription, then onCancel is called. That's why it is safe to use the same functions here.
Disruption answered 4/1, 2015 at 15:28 Comment(3)
Yes, this looks like a much better implementation! Thank you, I'll try out laterPresidentship
Thanks for the implementation, this is very helpful :)Voidable
I realize this is ancient, and maybe I'm daft, but isn't there an abstraction problem regarding cancelOnError? It seems the implementation requires this to be supplied, but it's potentially not a variable you have any access to when creating the transformer: it could be supplied further downstream, wherever the responsibility for calling listen resides.Ectophyte
D
22

Why don't you use StreamTransformer.fromHandler():

import 'dart:async';

void handleData(data, EventSink sink) {
  sink.add(data*2);
}

void main() {
  StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);

  StreamController controller = new StreamController();
  controller.stream.transform(doubleTransformer).listen((data) {
    print('data: $data');
  });

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

Output:

data: 2
data: 4
data: 6
Disruption answered 4/1, 2015 at 13:47 Comment(4)
Because I need a class. Content needs to be buffered and such, it is quite complicated. Thank you anyway.Presidentship
Then make a global buffer variable that buffers data and write data to the sink as needed?Disruption
No, that's an inherently bad idea. What if I want to use this more than once? Having a global buffer would mean two streams could potentially buffer to the same place, else each time I'd have to create a separate variable somewhere outside the scope of the function. Lacks encapsulation and code maintainability. Just want to call .transform(new exampleStreamTransformer()); + even that could be shortened, but I'm rather stuck to the language conventions in this respect. Here is an old (deprecated) example: victorsavkin.com/post/51233496661/…Presidentship
"global"-anything is inherently a bad ideaEctophyte
D
13

Okay. Here's another working example:

import 'dart:async';

class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOnError;

  // Original Stream
  Stream<S> _stream;

  DuplicateTransformer({bool sync: false, this.cancelOnError}) {
    _controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
      _subscription.pause();
    }, onResume: () {
      _subscription.resume();
    }, sync: sync);
  }

  DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
    _controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    _subscription = _stream.listen(onData,
      onError: _controller.addError,
      onDone: _controller.close,
      cancelOnError: cancelOnError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  /**
   * Transformation
   */

  void onData(S data) {
    _controller.add(data);
    _controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
  }

  /**
   * Bind
   */

  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }
}

void main() {
  // Create StreamController
  StreamController controller = new StreamController.broadcast();
  // Transform
  Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());

  s.listen((data) {
    print('data: $data');
  }).cancel();

  s.listen((data) {
    print('data2: $data');
  }).cancel();

  s.listen((data) {
    print('data3: $data');
  });

  // Simulate data

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

Let me add some notes:

  • Using implements seems to be the right way here when looking at the source code of other dart internal transformers.
  • I implemented both versions for regular and a broadcast stream.
  • In case of a regular stream you can call cancel/pause/resumt directly on the new stream controller because we can only listen once.
  • If you use a broadcast stream I found out that listen() is only called if there is no one listening already to the stream. onCancel behaves the same. If the last subscriber cancels its subscription, then onCancel is called. That's why it is safe to use the same functions here.
Disruption answered 4/1, 2015 at 15:28 Comment(3)
Yes, this looks like a much better implementation! Thank you, I'll try out laterPresidentship
Thanks for the implementation, this is very helpful :)Voidable
I realize this is ancient, and maybe I'm daft, but isn't there an abstraction problem regarding cancelOnError? It seems the implementation requires this to be supplied, but it's potentially not a variable you have any access to when creating the transformer: it could be supplied further downstream, wherever the responsibility for calling listen resides.Ectophyte
L
11

Unlike map, transformers are more powerful and allows you to maintain an internal state, and emit a value whenever you want. It can achieve things map can't do, such as delaying, duplicating values, selectively omitting some values, and etc.

Essentially, the implementation requires a bind method that provides a new stream based on an old stream being passed in, and a cast method that helps with type-checking during run-time.

Here's an over-simplified example of implementing a "TallyTransformer" that transforms a stream of integer values into a stream of sums. For example, if the input stream so far had 1, 1, 1, -2, 0, ..., the output stream would've been 1, 2, 3, 1, 1, ..., i.e. summing all inputs up to this point.

Example usage: stream.transform(TallyTransformer())

class TallyTransformer implements StreamTransformer {
  StreamController _controller = StreamController();
  int _sum = 0; // sum of all values so far

  @override
  Stream bind(Stream stream) {
    // start listening on input stream
    stream.listen((value) {
      _sum += value; // add the new value to sum
      _controller.add(_sum); // emit current sum to our listener
    });
    // return an output stream for our listener
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    return StreamTransformer.castFrom(this);
  }
}

This example is over-simplified (but still works) and does not cover cases such as stream pausing, resuming or canceling. If you run into "Stream has already been listened" error, make sure streams are broadcasting.

Lunneta answered 20/9, 2020 at 18:18 Comment(0)
S
7

https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139

You can use StreamTransformer.fromHandlers to easily create transformers that just convert input events to output events.

Example:

new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) {
  if (event.startsWith('data:')) {
    output.add(JSON.decode(event.substring('data:'.length)));
  } else if (event.isNotEmpty) {
    output.addError('Unexpected data from CloudBit stream: "$event"');
  }
});
Sarnen answered 3/11, 2016 at 6:44 Comment(0)
S
1

If you want to simply transform values using a function like this

int handleData(int data) {
  return data * 2;
}

use map method of Stream

stream
  .map(handleData)
  .listen((data) {
    print('data: $data');
  });

Full example:

import 'dart:async';

int handleData(int data) {
  return data * 2;
}

void main() {
  final controller = StreamController<int>();

  controller.stream
    .map(handleData)
    .listen((data) {
      print('data: $data');
    });

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

See more examples on dart.dev

Silsbye answered 12/6, 2020 at 22:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.