Filtering based on the last two current values in Java rx
Asked Answered
W

2

6

I'm trying to build a simple application using java reactive extensions. I have two streams that emits temperature values continuously, I want to detect and filter out spikes of sensed temperature that could be errors, for doing so I need to take account of the precedent value too so that I can take account of the variation like so:

filtering on pairs idea

Still I was unable to find the right operator in the documentation. Has anybody any idea of how can I accomplish the task? Should I make a custom operator?

These are my streams:

double min = 50, max = 75, spikeFreq = 0.01;
    Observable<Double> tempStream1 = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new TempStream(subscriber, min, max, spikeFreq).start();
    });

    Observable<Double> tempStream2 = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new TempStream(subscriber, min, max, spikeFreq).start();
    });

public class TempStream extends Thread{

private Subscriber<? super Double> subscriber;
private TempSensor sensor;

public TempStream(Subscriber<? super Double> subscriber, double min,
        double max, double spikeFreq) {
    this.subscriber = subscriber;
    sensor = new TempSensor(min, max, spikeFreq);
}

    @Override
    public void run() {
        Random gen = new Random(System.currentTimeMillis());
        while (!subscriber.isUnsubscribed()) {
            try {
                subscriber.onNext(sensor.getCurrentValue());
                Thread.sleep(1000 + gen.nextInt() % 1000);
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
        }
        subscriber.onCompleted();
    }
}
Wadlinger answered 3/6, 2015 at 8:24 Comment(1)
It seems there is no operator to perform that. However, you can publish the stream, and zip it with itself.drop(1) to obtain a stream of pairs of values. That one will be easy to filter.Schoolhouse
Y
14

Perhaps the buffer operator (http://reactivex.io/documentation/operators/buffer.html) might help in this case. You want to use buffer with count = 2 and skip = 1. That way you'll get a "lookahead" of one element on the stream.

E.g.:

stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);

Notice that this example also checks whether two values were buffered since it might happen that only one is emitted upon completion of the stream.

Yoshi answered 4/6, 2015 at 5:27 Comment(3)
How to pair with previous starting with the first (so previous would be optional and the first time it's emitted it's null)? Using scan probably?Abreast
This does not repeat values in the spike check. For example, stream of 10,20,35,40 will emit [10,20] and [35,40] instead of [10,20], [20,35], and [35,40]Perchance
@Perchance The doc seems to be wrong. I got [10,20], [20,35], and [35,40] when I tried to run the code.Deuno
M
0

It might be achieved using scan operator:

 val max = `your value`

 stream
    .scan(Pair(Int.MAX_VALUE, Int.MAX_VALUE)) { t1, t2 -> t1.second to t2 }
    .skip(1)
    .filter { (previous, current) -> current - previous < max }
    .map { it.second }

So build pairs of items, previous to current.

  • Skip initial value
  • For the first element previous will be Int.MAX_VALUE which always passes filter current - previous < max.
  • other elements are real prev/current pairs
  • get current item for pairs which passes the filter

Less clear, but encapsulated:

/** emits first item, then emits only if current value passes [filter] */
fun <T : Any> Observable<T>.filterWithPrevious(filter: (previous: T, current: T) -> Boolean): Observable<T> =
    scan(Pair(null, null)) { t1: Pair<T?, T?>, t2: T -> t1.second to t2 }
        .skip(1)
        .filter { (first, second) -> first?.let { filter(it, second!!) } ?: true }
        .map { it.second!! }

and then:

stream.filterWithPrevious { previous, current -> current - previous < max }

PS: I've written it on kotlin, for java it will be the same but verbose.

Mcfarlin answered 9/3, 2022 at 13:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.