I really like the RxJava, it's a wonderful tool but somethimes it's very hard to understand how it works. We use Retrofit with a RxJava in our Android project and there is a following use-case:
I need to poll the server, with some delay between retries, while server is doing some job. When server is done I have to deliver the result. So I've successfully done it with RxJava, here is the code snippet: I used "skipWhile" with "repeatWhen"
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
The code works fine:
When server responded that job is processing I return "true" from "skipWhile" chain, the original Observable waits for 1 second and do the http request again. This process repeats until I return "false" from "skipWhile" chain.
Here is a few things I don't understand:
I saw in the documentation of "skipWhile" that it will not emit anything (onError, onNext, onComplete) from original Observable until I return "false" from its "call" method. So If it doesn't emit anything why does the "repeatWhen" Observable doing it's job? It waits for one second and run the request again. Who launches it?
The second question is: Why Observable from "repeatWhen" is not running forever, I mean why it stops repeating when I return "false" from "skipWhile"? I get onNext successfully in my Subscriber if I return "false".
In documentation of "repeatWhile" it says that eventually I get a call to "onComplete" in my Subscriber but "onComplete" is never called.
It makes no difference if I change the order of chaining "skipWhile" and "repeatWhen". Why is that ?
I understand that RxJava is opensource and I could just read the code, but as I said - it's really hard to understand.
Thanks.