Interrupt a single observable in concatMap
Asked Answered
I

2

7

I use concatMap to process a stream of items one at a time with a long running operation. At some point I need to "interrupt" this long running operation, but only for the current item:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer));
}

I tried to solve this by retaining the disposable of the "inner" stream and disposing of it at the right time. But this didn't work. The inner stream is disposed, but the concatMap never moved on to processing item 3. The test just hangs (since the outer observable never completes/terminates/disposes either)

Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    disposable.dispose();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer))
            .doOnSubscribe(disposable -> {
                // save disposable so we can "interrupt" later
                System.out.println("Saving disposable for " + integer);
                Example.this.disposable = disposable;
            });
}

Even if this did work it seemed a bit to hacky by relying on the side effect. What is the best way to accomplish this?

Invoke answered 7/4, 2019 at 1:45 Comment(0)
I
1

I had nearly the same question as How to cancel individual network request in Retrofit with RxJava?. I can use a PublishSubject to "interrupt"

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    interrupter.onComplete();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    interrupter = PublishSubject.create();

    return Observable
            .just("Start working on " + integer,
                    "Still working on " + integer,
                    "Almost done working on " + integer)
            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("Finally for " + integer))
            .takeUntil(interrupter);
}
Invoke answered 13/5, 2019 at 21:33 Comment(0)
U
0

You just need to implement semaphore-kind Observable that could trigger stop events.

Implement this semaphore, for example:

PublishSubject<Boolean> semaphore = PublishSubject.create()

Modify Observable created in .concatMap() like this:

.concatMap(string ->
                        Observable.just(string)
                                .delay(2, TimeUnit.SECONDS)
                                .takeUntil(semaphore)) //Here is the stop trigger

When you need to cancel current processing item - just call:

   semaphore.onNext(true);
Ursulaursulette answered 27/6, 2019 at 7:28 Comment(1)
How is this any different than my answer? I'm genuinely curious.Invoke

© 2022 - 2024 — McMap. All rights reserved.