I use concatMap
to process a stream of items one at a time with a long running operation. At some point I need to "interrupt" this long running operation, but only for the current item:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer));
}
I tried to solve this by retaining the disposable of the "inner" stream and disposing of it at the right time. But this didn't work. The inner stream is disposed, but the concatMap
never moved on to processing item 3. The test just hangs (since the outer observable never completes/terminates/disposes either)
Disposable disposable = Disposables.empty();
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
disposable.dispose();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer))
.doOnSubscribe(disposable -> {
// save disposable so we can "interrupt" later
System.out.println("Saving disposable for " + integer);
Example.this.disposable = disposable;
});
}
Even if this did work it seemed a bit to hacky by relying on the side effect. What is the best way to accomplish this?