Filter an observable using values from another observable
Asked Answered
T

4

39

I have two observables:

  1. An observable representing a list of checkbox inputs.
  2. An observable representing a stream of events coming from the server.

I'd like to filter the second observable using values from the first one.

The values received from the server include a tag property, which corresponds to values in the checkbox list. The observable resulted from the combination of the above two would only yield values from the server whose tag property is included in the set of ticked checkboxes.

Tucci answered 19/7, 2013 at 11:54 Comment(0)
S
45

You can use withLatestFrom. enter image description here.

source.withLatestFrom(checkboxes, (data, checkbox) => ({data, checkbox}))
  .filter(({data, checkbox}) => ...)

Here, checkboxes is an observable representing a list of checkbox inputs. source is an observable representing a stream of events coming from the server. In the filter function you can check if the data is valid compared to the checkbox settings and let it trough.

Notice it is important checkboxes emits at least 1 value before the stream can emit anything.

Ps. In regard to other answers, this solution works even if the source is cold.

Sidneysidoma answered 24/8, 2016 at 22:3 Comment(8)
@IonuțG.Stan Indeed. And it it surprising how many people still refer to these old questions, so hopefully it'll help a few people :)Sidneysidoma
You should note that this is is annotated with @Experimental, and could change significantly at any time, so it should not be used or relied upon in production code.Placencia
@Placencia This is use in production code in many places. Where exactly did you find this @ Experimental tag?Sidneysidoma
Oh I'm sorry, I arrived at the question looking for some RxJava insight (same question but for java), and I didn't notice the rxjs tag. So I added that comment because the withLatestFrom operator is annotated with @Experimental in the java implementation (assuming version 1.X of RxJava). My bad!Placencia
@Placencia As far as i can see withLatestFrom doesn't have the experimental tag in RxJava 2.Sidneysidoma
Yeah, I know, I don't see it in the source code neither. Just wanted to note that for RxJava 1, which is still actively maintained and widely used.Placencia
Tried to upvote this but looks like I'd downvoted it 9 months ago. Probably a mistake - sorry!Estis
@Estis AhA! I was wondering about that downvote for 9 months now :-)Sidneysidoma
S
6

In order to filter stream A using values of stream B, you need to observe stream B and use the latest values to filter stream A.

Use switch() to transform B observable to an observable producing values from A observable.

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });

Using switch() assumes that dataSource is a hot observable.

Example using interval() to produce dummy data:

var input,
    checkedInputValuesSource,
    dataSource;

input = document.querySelectorAll('input');

// Generate source describing the current filter.
checkedInputValuesSource = Rx.Observable
    .fromEvent(input, 'change')
    .map(function () {
        var inputs = document.querySelectorAll('input'),
            checkedInputValues = [];
        
        [].forEach.call(inputs, function (e) {
            if (e.checked) {
                checkedInputValues.push(e.value);
            }
        });
        
        return checkedInputValues;
    })
    .startWith([]);

// Generate random data source (hot).
dataSource = Rx.Observable
    .interval(500)
    .map(function () {
        var options = ['a', 'b', 'c'];
    
        return options[Math.floor(Math.floor(Math.random() * options.length))];
    })
    .do(function (x) {
        console.log('in: ' + x);
    })
    .share();

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

<input type='checkbox' value='a'>
<input type='checkbox' value='b'>
<input type='checkbox' value='c'>

This example will produce output similar to:

in: c
in: a
out: a
in: b
in: c
out: a
in: b
in: a

Where in reflects all generated input and b the data that passes the filter. Filter is adjusted by checking the checkbox inputs, that reflect values "a", "b" and "c".

Serviceman answered 3/8, 2015 at 6:10 Comment(3)
Isn't this exactly what I wrote in my solution?Biblical
Might be. I am going through each rxjs question/answer and contribute whenever I can. Your answer links to an external resource and requires additional dependencies to run. I have written down a solution in a sandboxed environment.Serviceman
Notice you can make this work even if the source is cold. With publish(...). Like this: dataSource .publish(dsrc => checkedInputValuesSource .switchMap((options) => dsrc .filter((value) => options.indexOf(value) !== -1) ) )Sidneysidoma
T
2

Apparently, what I needed was a combination of select, filter and switchLatest. I've written a small test case demonstrating this: https://gist.github.com/igstan/d5b8db7b43f49dd87382#file-observable-filter-observable-js-L36-L45

Tucci answered 19/7, 2013 at 12:33 Comment(5)
To be clear, your gist uses map and filter which I assume are just aliases you've made for select and where?Troublemaker
@Troublemaker yes, that's true.Biblical
Hi, I'd like to play with your code on github, which other js libs do I need?Godgiven
@GüntherSchmidt I've put together all you need to run the tests in a separate repo: github.com/igstan/rx-testing-demo. Have fun!Biblical
Thanks very much! In the meantime I've been learning how to use some of the combinators and "combineLatest" saved the day in a particularly bad problem.Godgiven
H
0

Expanding on the answer from @Dorus... In Kotlin, you can do it like so:

val observable: Observable<Data> = ...
val filter: Observable<Checkbox> = ...
val filtered: Observable<Data> =
        observable.filterWithLatestFrom(filter) { checkbox -> checkbox.isSelected }

Using the extension function:

/**
 * Return an [Observable] with type [T1] that is filtered using the last event emitted by the [other] observable.
 */
fun <T1 : Any, T2 : Any> Observable<T1>.filterWithLatestFrom(other: Observable<T2>, filterFunction: (T2) -> Boolean)
: Observable<T1> {
    return this.withLatestFrom(other) { obs1, obs2 -> Pair(obs1, obs2) }
        .filter { (_, obs2) -> filterFunction.invoke(obs2) }
        .map { (obs1, _) -> obs1}
}
Hebrew answered 5/2, 2021 at 12:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.