Split Rx Observable into multiple streams and process individually
Asked Answered
P

4

39

Here is a picture of what I am attempting to accomplish.

--a-b-c-a--bbb--a

split into

--a-----a-------a --> a stream

----b------bbb--- --> b stream

------c---------- --> c stream

Then, be able to

a.subscribe()
b.subscribe()
c.subscribe()

So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.

The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?

Penalize answered 4/3, 2015 at 12:36 Comment(0)
S
18

You don't have to collapse Observables from groupBy. You can instead subscribe to them.

Something like this:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

Action1<String> a = s -> System.out.print("-a-");

Action1<String> b = s -> System.out.print("-b-");

Action1<String> c = s -> System.out.print("-c-");

Observable
    .from(inputs)
    .groupBy(s -> s)
    .subscribe((g) -> {
        if ("a".equals(g.getKey())) {
            g.subscribe(a);
        }

        if ("b".equals(g.getKey())) {
            g.subscribe(b);
        }

        if ("c".equals(g.getKey())) {
            g.subscribe(c);
        }
    });

If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.

Seigneur answered 4/3, 2015 at 16:44 Comment(6)
Yeah, I would like to avoid having those ifs possibly. However, if it works out, then it'll look a bit cleaner since its all in one place, rather than doing filters on the original stream. Thanks!Penalize
Cool! I'll update my answer if I figure out how to get rid of if statements.Seigneur
You could use a Dictionary<groupKey,action>, and your group subscribe method would have to resolve the action from the dictionary and call it.Kindling
Something like Dictionary<String, Action1<String>> handlers = new Hashtable<>(); handlers.put("a", s -> System.out.println("-a-"));.... and .subscribe((g) -> { Action1<String> action = handlers.get(g.getKey()); g.subscribe(action); });Kindling
Brandon Bill, I had to reread your question to figure out that you were originally using filter. Out of curiosity, what makes you prefer this groupBy solution to the filter one? To me filter seems simpler, easier to understand, probably better performing. What advantages do you see in groupBy?Stradivari
I noticed some interesting things while debugging (note this is using Rx JS), where once a "group" had been associated with a handler, another instance of an element from the group would not go through the big subscribe function shown above. Instead it would forward directly to the Action that the group was subscribed to. Whereas, with a filter the comparisons get made for every single value of the source observable. Therefore, with N splits, a possible N comparisons will be made for every element. With groups, comparisons are only made when a new group is found.Penalize
E
43

Easy as pie, just use filter

An example in scala

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

You just need to make sure that the source observable is hot. The easiest way is to share it.

Engine answered 5/3, 2015 at 9:8 Comment(4)
What if you want or initial observable to be cold?Capon
@double_squeeze just use publish instead of share and invoke connect when all of subsribers are subscribed.Mitis
There is no point in making it hot using share. Provided code actually subscribes three times - for each subscriber, same as it would without share. The right way to do it is described in Krzysztof Skyrzynecki's comment: use publish instead of share and invoke connect when all of subsribers are subscribed.Samuels
I do agree that the publish+connect method is cleaner and I would use it whenever feasible (but sometimes, you just don't know the subscribers in advance and don't care that you miss some items). However, I don't think your statement about three subscriptions to the original cold o is correct. If you can prove otherwise, please share() :) your proof with us.Stradivari
S
18

You don't have to collapse Observables from groupBy. You can instead subscribe to them.

Something like this:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

Action1<String> a = s -> System.out.print("-a-");

Action1<String> b = s -> System.out.print("-b-");

Action1<String> c = s -> System.out.print("-c-");

Observable
    .from(inputs)
    .groupBy(s -> s)
    .subscribe((g) -> {
        if ("a".equals(g.getKey())) {
            g.subscribe(a);
        }

        if ("b".equals(g.getKey())) {
            g.subscribe(b);
        }

        if ("c".equals(g.getKey())) {
            g.subscribe(c);
        }
    });

If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.

Seigneur answered 4/3, 2015 at 16:44 Comment(6)
Yeah, I would like to avoid having those ifs possibly. However, if it works out, then it'll look a bit cleaner since its all in one place, rather than doing filters on the original stream. Thanks!Penalize
Cool! I'll update my answer if I figure out how to get rid of if statements.Seigneur
You could use a Dictionary<groupKey,action>, and your group subscribe method would have to resolve the action from the dictionary and call it.Kindling
Something like Dictionary<String, Action1<String>> handlers = new Hashtable<>(); handlers.put("a", s -> System.out.println("-a-"));.... and .subscribe((g) -> { Action1<String> action = handlers.get(g.getKey()); g.subscribe(action); });Kindling
Brandon Bill, I had to reread your question to figure out that you were originally using filter. Out of curiosity, what makes you prefer this groupBy solution to the filter one? To me filter seems simpler, easier to understand, probably better performing. What advantages do you see in groupBy?Stradivari
I noticed some interesting things while debugging (note this is using Rx JS), where once a "group" had been associated with a handler, another instance of an element from the group would not go through the big subscribe function shown above. Instead it would forward directly to the Action that the group was subscribed to. Whereas, with a filter the comparisons get made for every single value of the source observable. Therefore, with N splits, a possible N comparisons will be made for every element. With groups, comparisons are only made when a new group is found.Penalize
C
1

I have been thinking about this and Tomas solution is OK, but the issue is that it converts the stream to a hot observable.

You can use share in combination with defer in order to get a cold observable with other streams.

For example (Java):

var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
    var shared - originalObservable.share();
    var aSource = shared.filter(x -> x.equals("a"));
    var bSource = shared.filter(x -> x.equals("b"));
    var cSource = shared.filter(x -> x.equals("c"));
    // some logic for sources
    return shared;
});

Chip answered 24/12, 2019 at 7:42 Comment(0)
C
1

In RxJava there is a special version of publish operator that takes a function.

ObservableTransformer {
  it.publish { shared ->
    Observable.merge(
        shared.ofType(x).compose(transformerherex),
        shared.ofType(y).compose(transformerherey)
    )
  }
}

This splits the event stream by type. Then you can process them separately by composing with different transformers. All of them share single subscription.

Correa answered 15/6, 2020 at 14:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.