Using of "skipWhile" combined with "repeatWhen" in RxJava to implement server polling
Asked Answered
C

1

11

I really like the RxJava, it's a wonderful tool but somethimes it's very hard to understand how it works. We use Retrofit with a RxJava in our Android project and there is a following use-case:

I need to poll the server, with some delay between retries, while server is doing some job. When server is done I have to deliver the result. So I've successfully done it with RxJava, here is the code snippet: I used "skipWhile" with "repeatWhen"

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
        .skipWhile(new Func1<CheckJobResponse, Boolean>() {

            @Override
            public Boolean call(CheckJobResponse checkJobResponse) {
                boolean shouldSkip = false;

                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());

                switch (checkJobResponse.getJobStatus()){
                    case CheckJobResponse.PROCESSING:
                        shouldSkip = true;
                        break;
                    case CheckJobResponse.DONE:
                    case CheckJobResponse.ERROR:
                        shouldSkip = false;
                        break;
                }
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);

                return shouldSkip;
            }
        })
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
                return observable.delay(1, TimeUnit.SECONDS);
            }
        }).subscribe(new Subscriber<CheckJobResponse>(){
            @Override
            public void onNext(CheckJobResponse response) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);

            }

            @Override
            public void onError(BaseError error) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
                Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();

            }

            @Override
            public void onCompleted() {
                if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
            }
        });

The code works fine:

When server responded that job is processing I return "true" from "skipWhile" chain, the original Observable waits for 1 second and do the http request again. This process repeats until I return "false" from "skipWhile" chain.

Here is a few things I don't understand:

  1. I saw in the documentation of "skipWhile" that it will not emit anything (onError, onNext, onComplete) from original Observable until I return "false" from its "call" method. So If it doesn't emit anything why does the "repeatWhen" Observable doing it's job? It waits for one second and run the request again. Who launches it?

  2. The second question is: Why Observable from "repeatWhen" is not running forever, I mean why it stops repeating when I return "false" from "skipWhile"? I get onNext successfully in my Subscriber if I return "false".

  3. In documentation of "repeatWhile" it says that eventually I get a call to "onComplete" in my Subscriber but "onComplete" is never called.

  4. It makes no difference if I change the order of chaining "skipWhile" and "repeatWhen". Why is that ?

I understand that RxJava is opensource and I could just read the code, but as I said - it's really hard to understand.

Thanks.

Cobaltite answered 22/1, 2016 at 9:55 Comment(0)
C
15

I've not worked with repeatWhen before, but this question made me curious, so I did some research.

skipWhile does emit onError and onCompleted, even if it never returns true before then. As such, repeatWhen is being called every time checkJob() emits onCompleted. That answers question #1.

The rest of the questions are predicated on false assumptions. Your subscription is running forever because your repeatWhen never terminates. That's because repeatWhen is a more complex beast than you realize. The Observable in it emits null whenever it gets onCompleted from the source. If you take that and return onCompleted then it ends, otherwise if you emit anything it retries. Since delay just takes an emission and delays it, it's always emitting the null again. As such, it constantly resubscribes.

The answer to #2, then, is that it is running forever; you're probably doing something else outside this code to cancel the subscription. For #3, you never get onCompleted because it never completes. For #4, the order doesn't matter because you're repeating indefinitely.

The question now is, how do you get the correct behavior? It's as simple as using takeUntil instead of skipWhile. That way, you keep repeating until you get the result you want, thus terminating the stream when you want it to end.

Here's a code sample:

Observable<Boolean> source = ...; // Something that eventually emits true

source
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
    .takeUntil(result -> result)
    .filter(result -> result)
    .subscribe(
        res -> System.out.println("onNext(" + res + ")"),
        err -> System.out.println("onError()"),
        () -> System.out.println("onCompleted()")
    );

In this example, source is emitting booleans. I repeat every 1 second until the source emits true. I keep taking until result is true. And I filter out all notifications that are false, so the subscriber doesn't get them until it's true.

Calycle answered 22/1, 2016 at 14:25 Comment(5)
I made a typo: I saw in the documentation of "skipWhile" that it will not emit anything (onError, onNext, onComplete) from original Observable until I return "false" from its "call" method. Here is original documentation: Returns an Observable that skips all items emitted by the source Observable as long as a specified * condition holds true, but emits all further source items as soon as the condition becomes false. So I get "PROCESSING" first few times and return "true" from method. It shouldn't emit anything. Then why retry is happening?Cobaltite
Regarding #2. No I only have a code that unsubscribes the subscription in onStop of the Activity, but it's not the case. I would saw that I'm still polling the server with API calls :)Cobaltite
Regarding the first comment - the emissions of checkJob() are probably onNext(CheckJobResponse) -> onCompleted(). It skips forwarding the first one when it returns PROCESSING, but the onCompleted() still goes through. (I confirmed this by looking at the source code.)Calycle
As for #2 - I can't explain that. All I know is that, in my own tests, your setup does not terminate. Your retryWhen never calls onCompleted() so it should never terminate.Calycle
Ok. I've checked my implementation again: After I get "DONE" from the server I run another API request and then close the activity (And here I unsubscribe). That's why I didn't notice that original API call is still repeting. This answer is correct. Thanks.Cobaltite

© 2022 - 2024 — McMap. All rights reserved.