In order to filter stream A using values of stream B, you need to observe stream B and use the latest values to filter stream A.
Use switch()
to transform B observable to an observable producing values from A observable.
checkedInputValuesSource
.map(function (options) {
return dataSource
.filter(function (value) {
return options.indexOf(value) !== -1;
});
})
.switch()
.subscribe(function (x) {
console.log('out: ' + x);
});
Using switch()
assumes that dataSource
is a hot observable.
Example using interval()
to produce dummy data:
var input,
checkedInputValuesSource,
dataSource;
input = document.querySelectorAll('input');
// Generate source describing the current filter.
checkedInputValuesSource = Rx.Observable
.fromEvent(input, 'change')
.map(function () {
var inputs = document.querySelectorAll('input'),
checkedInputValues = [];
[].forEach.call(inputs, function (e) {
if (e.checked) {
checkedInputValues.push(e.value);
}
});
return checkedInputValues;
})
.startWith([]);
// Generate random data source (hot).
dataSource = Rx.Observable
.interval(500)
.map(function () {
var options = ['a', 'b', 'c'];
return options[Math.floor(Math.floor(Math.random() * options.length))];
})
.do(function (x) {
console.log('in: ' + x);
})
.share();
checkedInputValuesSource
.map(function (options) {
return dataSource
.filter(function (value) {
return options.indexOf(value) !== -1;
});
})
.switch()
.subscribe(function (x) {
console.log('out: ' + x);
});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>
<input type='checkbox' value='a'>
<input type='checkbox' value='b'>
<input type='checkbox' value='c'>
This example will produce output similar to:
in: c
in: a
out: a
in: b
in: c
out: a
in: b
in: a
Where in
reflects all generated input and b
the data that passes the filter. Filter is adjusted by checking the checkbox inputs, that reflect values "a", "b" and "c".