RxJava- CombineLatest but only fire for one Observable's emission?
Asked Answered
A

4

13

Let's say I have two infinite Observables that can emit values at any moment. They combine to create a Observable<ProcessFileEvent>.

Observable<Integer>  selectedFileId= ...
Observable<MouseClick> buttonClick = ...

Observable<ProcessFileEvent> `processFileEvent` = Observable.combineLatest(selectedFileId, buttonClick, (s,b) -> {
    //create ProcessFileEvent here
});

The problem is I only want the processFileEvent to emit when buttonClick emits something, not selectedFileId. It's definitely not the behavior a user expects when a file ID is inputted and it kicks off a ProcessFileEvent. How do I combine but only emit when the buttonClick emits?

Anse answered 8/7, 2015 at 19:45 Comment(0)
S
3

Use .distinctUntilChanged() on the MouseClick object. That way you'll only get events when the MouseClick changes.

Create a class that contains both fileId and mouseClick:

static class FileMouseClick {
    final int fileId;
    final MouseClick mouseClick;

    FileMouseClick(int fileId, MouseClick mouseClick) {
        this.fileId = fileId;
        this.mouseClick = mouseClick;
    }
}

Then

Observable.combineLatest(selectedFileId, buttonClick, 
                         (s,b) -> new FileMouseClick(s,b))
    .distinctUntilChanged(f -> f.mouseClick)
    .map(toProcessFileEvent())
Samaritan answered 8/7, 2015 at 23:52 Comment(4)
This makes sense. One quick question. Does 'distinct()' rely on 'hashcode()/equals()' for each emitted item or does it suppress dupe sequences?Anse
a HashSet is used internally. If you have implemented hashCode()/equals() for MouseClick then it better be what you want. If you haven't implemented them then it will be just using object reference equality. Oh, this is overkill I just realized we need to use distinctUntilChanged. I'll update.Samaritan
Now that distinctUntilChanged is used, no HashSet is involved but the equals() method is used for the distinct check so you still need hashCode()/equals() behaving as you desire.Samaritan
Making this best answer because this solution seems to be the most flexible.Anse
I
19

Use withLatestFrom:

Observable<Integer>  selectedFileId= ...
Observable<MouseClick> buttonClick = ...

Observable<ProcessFileEvent> processFileEvent = buttonClick.withLatestFrom(selectedFieldId, (b,s) -> {
    //create ProcessFileEvent here
});

It only emits with when the first Observable buttonClick emits.

Indubitability answered 9/7, 2015 at 6:13 Comment(1)
This should be the accpeted answer as it answers OP's question preciselyHermit
S
3

Use .distinctUntilChanged() on the MouseClick object. That way you'll only get events when the MouseClick changes.

Create a class that contains both fileId and mouseClick:

static class FileMouseClick {
    final int fileId;
    final MouseClick mouseClick;

    FileMouseClick(int fileId, MouseClick mouseClick) {
        this.fileId = fileId;
        this.mouseClick = mouseClick;
    }
}

Then

Observable.combineLatest(selectedFileId, buttonClick, 
                         (s,b) -> new FileMouseClick(s,b))
    .distinctUntilChanged(f -> f.mouseClick)
    .map(toProcessFileEvent())
Samaritan answered 8/7, 2015 at 23:52 Comment(4)
This makes sense. One quick question. Does 'distinct()' rely on 'hashcode()/equals()' for each emitted item or does it suppress dupe sequences?Anse
a HashSet is used internally. If you have implemented hashCode()/equals() for MouseClick then it better be what you want. If you haven't implemented them then it will be just using object reference equality. Oh, this is overkill I just realized we need to use distinctUntilChanged. I'll update.Samaritan
Now that distinctUntilChanged is used, no HashSet is involved but the equals() method is used for the distinct check so you still need hashCode()/equals() behaving as you desire.Samaritan
Making this best answer because this solution seems to be the most flexible.Anse
S
1

You can use Observable.Join to do this. Pay special attention to this paragraph:

However, what could I do to make sure that these windows did not overlap- so that, once a second value was produced I would no longer see the first value? Well, if we returned the left sequence from the leftDurationSelector, that could do the trick. But wait, when we return the sequence left from the leftDurationSelector, it would try to create another subscription and that may introduce side effects. The quick answer to that is to Publish and RefCount the left sequence. If we do that, the results look more like this.

left  |-0-1-2-3-4-5|
right |---A---B---C|
result|---1---3---5
          A   B   C

That marble diagram is what you want, where selectedFileId is the left sequence and buttonClick is the right sequence.

Salzburg answered 8/7, 2015 at 20:35 Comment(0)
M
0

this is what worked for me, the missing piece was startWith() operators. Once you add it into the observables it will start working working.

Sample

         Observable.combineLatest(
                    firstObservable.observeDoors("1").startWith(emptyList<Door>()),
                    secondObservable.observeUnits("2").startWith( emptyList<Door>())
                ) { doors, units ->
                    ArrayList<Door>().apply {
                        addAll(units)
                        addAll(doors)
                    }
                }.map { 
                    //do something 
                }

it's because combine needs all the sources to start to emit at least one value and it will initialize the stream and after that just listen to it for changes.

Mauromaurois answered 28/4, 2022 at 17:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.