Android: Polling a server with Retrofit
Asked Answered
K

2

7

I'm building a 2 Player game on Android. The game works turnwise, so player 1 waits until player 2 made his input and vice versa. I have a webserver where I run an API with the Slim Framework. On the clients I use Retrofit. So on the clients I would like to poll my webserver (I know it's not the best approach) every X seconds to check whether there was an input from player 2 or not, if yes change UI (the gameboard).

Dealing with Retrofit I came across RxJava. My problem is to figure out whether I need to use RxJava or not? If yes, are there any really simple examples for polling with retrofit? (Since I send only a couple of key/value pairs) And if not how to do it with retrofit instead?

I found this thread here but it didn't help me too because I still don't know if I need Retrofit + RxJava at all, are there maybe easier ways?

Katy answered 6/2, 2015 at 15:57 Comment(2)
you don't need RxJava. But it does play well with retrofit.Harlequinade
So how do I implement polling with Retrofit without RxJava?Katy
C
15

Let's say the interface you defined for Retrofit contains a method like this:

public Observable<GameState> loadGameState(@Query("id") String gameId);

Retrofit methods can be defined in one of three ways:

1.) a simple synchronous one:

public GameState loadGameState(@Query("id") String gameId);

2.) one that take a Callback for asynchronous handling:

public void loadGameState(@Query("id") String gameId, Callback<GameState> callback);

3.) and the one that returns an rxjava Observable, see above. I think if you are going to use Retrofit in conjunction with rxjava it makes the most sense to use this version.

That way you could just use the Observable for a single request directly like this:

mApiService.loadGameState(mGameId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {

    @Override
    public void onNext(GameState gameState) {
        // use the current game state here
    }

    // onError and onCompleted are also here
});

If you want to repeatedly poll the server using you can provide the "pulse" using versions of timer() or interval():

Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(mApiService.loadGameState(mGameId))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {

    @Override
    public void onNext(GameState gameState) {
        // use the current game state here
    }

    // onError and onCompleted are also here
}).

It is important to note that I am using flatMap here instead of map - that's because the return value of loadGameState(mGameId) is itself an Observable.

But the version you are using in your update should work too:

Observable.interval(2, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> Api.ReceiveGameTurn())
.doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.retry()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(sub);

That is, if ReceiveGameTurn() is defined synchronously like my 1.) above, you would use map instead of flatMap.

In both cases the onNext of your Subscriber would be called every two seconds with the latest game state from the server. You can process them one after another of limit the emission to a single item by inserting take(1) before subscribe().

However, regarding the first version: A single network error would be first delivered to onError and then the Observable would stop emitting any more items, rendering your Subscriber useless and without input (remember, onError can only be called once). To work around this you could use any of the onError* methods of rxjava to "redirect" the failure to onNext.

For example:

Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<GameState>>(){

    @Override
    public Observable<GameState> call(Long tick) {
        return mApiService.loadGameState(mGameId)
        .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
        .onErrorResumeNext(new Func1<Throwable, Observable<GameState>(){
            @Override
            public Observable<GameState> call(Throwable throwable) {
                return Observable.emtpy());
            }
        });
    }
})
.filter(/* check if it is a valid new game state */)
.take(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {

    @Override
    public void onNext(GameState gameState) {
        // use the current game state here
    }

    // onError and onCompleted are also here
}).

This will every two seconds: * use Retrofit to get the current game state from the server * filter out invalid ones * take the first valid one * and the unsubscribe

In case of an error: * it will print an error message in doOnNext * and otherwise ignore the error: onErrorResumeNext will "consume" the onError-Event (i.e. your Subscriber's onError will not be called) and replaces it with nothing (Observable.empty()).

And, regarding the second version: In case of a network error retry would resubscribe to the interval immediately - and since interval emits the first Integer immediately upon subscription the next request would be sent immediately, too - and not after 3 seconds as you probably want...

Final note: Also, if your game state is quite large, you could also first just poll the server to ask whether a new state is available and only in case of a positive answer reload the new game state.

If you need more elaborate examples, please ask.

UPDATE: I've rewritten parts of this post and added more information in between.

UPDATE 2: I've added a full example of error handling with onErrorResumeNext.

Coffeepot answered 8/2, 2015 at 19:27 Comment(2)
I will have to take a look at error handling later. But don't we use doOnError for error handling?Katy
That depends on what you mean by "handling". doOnError simply does something when an error is passed down the Observable pipe line. But it is not really meant, I think, as the place to resolve the error. By default, errors can be handled in onError - but that also terminates the Observable entirely. There are other options to react to errors, that keep the subscription intact: emit something else instead (onErrorResumeNext), retry, etc.Coffeepot
K
1

Thank you, I finally made it in a similar way based the post I referred to in my question. Here's my code for now:

Subscriber sub =  new Subscriber<Long>() {
        @Override
        public void onNext(Long _EmittedNumber)
        {
            GameTurn Turn =  Api.ReceiveGameTurn(mGameInfo.GetGameID(), mGameInfo.GetPlayerOneID());
            Log.d("Polling", "onNext: GameID - " + Turn.GetGameID());
        }

        @Override
        public void onCompleted() {
            Log.d("Polling", "Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d("Polling", "Error: " + e);
        }
    };

    Observable.interval(3, TimeUnit.SECONDS, Schedulers.io())
            // .map(tick -> Api.ReceiveGameTurn())
            // .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
            .retry()
            .subscribe(sub);

The problem now is that I need to terminate emitting when I get a positive answer (a GameTurn). I read about the takeUntil method where I would need to pass another Observable which would emit something once which would trigger the termination of my polling. But I'm not sure how to implement this. According to your solution, your API method returns an Observable like it is shown on the Retrofit website. Maybe this is the solution? So how would it work?

UPDATE: I considered @david.miholas advices and tried his suggestion with retry and filter. Below you can find the code for the game initialization. The polling should work identically: Player1 starts a new game -> polls for opponent, Player2 joins the game -> server sends to Player1 opponent's ID -> polling terminated.

    Subscriber sub =  new Subscriber<String>() {
        @Override
        public void onNext(String _SearchOpponentResult) {}

        @Override
        public void onCompleted() {
            Log.d("Polling", "Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d("Polling", "Error: " + e);
        }
    };

    Observable.interval(3, TimeUnit.SECONDS, Schedulers.io())
            .map(tick -> mApiService.SearchForOpponent(mGameInfo.GetGameID()))
            .doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err))
            .retry()
            .filter(new Func1<String, Boolean>()
            {
                @Override
                public Boolean call(String _SearchOpponentResult)
                {
                    Boolean OpponentExists;
                    if (_SearchOpponentResult != "0")
                    {
                        Log.e("Polling", "Filter " + _SearchOpponentResult);
                        OpponentExists = true;
                    }
                    else
                    {
                        OpponentExists = false;
                    }
                    return OpponentExists;

                }
            })
            .take(1)
            .subscribe(sub);

The emission is correct, however I get this log message on every emit:

E/Polling﹕ Error retrieving messages: java.lang.NullPointerException

Apperently doOnError is triggered on every emit. Normally I would get some Retrofit debug logs on every emit which means that mApiService.SearchForOpponent won't get called. What do I do wrong?

Katy answered 8/2, 2015 at 19:55 Comment(11)
Maybe you could explain in more detail what exactly you want to achieve: At first I thought your wanted to poll the server every two seconds for the whole time your program is running, but now you say you want to terminate the emission of items after you get a successful reply from the server... You could of course just insert a take(1) before the subscribe to limit the emission to one item, but you could also have a look a retryWhen - maybe that is closer to what you actually want...Coffeepot
It is a turn-based game, so a player waits/polls the server for changes. When the other player sets an input he sends his turn, which is stored in a database. The first player then reveices a positive MySQL result with the turn information, which is needed to update the UI and terminate polling because the information was received. The official Rx wiki says: " if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source" which doesn't help me atmKaty
And the version you posted did what you want except that it didn't stop after one emitted item? In that case, you can really just put take(1) between retry() and subscribe() - that way take would only "let through" one item and then unsubscribe and thus stop the generation of ticks in interval. However, I think in case of e.g. a network error retry would resubscribe to the interval immediately - and since interval emits the first Integer immediately upon subscription the next request would be sent immediately, too - and not after 3 seconds as you probably want...Coffeepot
It has to stop emitting after a valid GameTurn not after emitting one. How would retry help me out exactly? Should I call my request in onCompleted instead of onNext? In case of a network error it might be a help. But an invalid answer to my reuqest is not a program error, it indicates that just nothing happened yet (the other player didn't perform his turn yet). So there is a differentation between an error and a negative response. On negative response -> continue emitting, on error -> tell the UI "error happened" and do something against it (also terminate emit, start over, whatever)Katy
OK, yeah, that makes sense - so, can I determine if the MySQL result is valid, i.e. a new turn made by the other player, just by looking at the result itself or do I need to compare it with the previous game state stored locally? If all I need is the response itself you could for example filter out any unwanted/invalid/not-new items - they would simply disappear from the stream and take(1) would work again.Coffeepot
The GameTurn itself is valid, this was ensured at the point of sending. So the only condition I need to know is "Is this response a GameTurn or a string with 'Nope, nohting happened yet'" (maybe I need a better mechanism...). What do you mean by filtering?Katy
Oh, OK, so you cannot guarantee that Retrofit will be able to parse your response to a GameTurn object, but instead have some String... still you could use rxjava's filter method, which takes a Func1<YourType, Boolean> which checks whether some condition is true for the YourType object and returns true or false accordingly. The filter operator will in turn either just forward that item (if the condition was met) or drop it (if the condition was not met). See here: reactivex.io/documentation/operators/filter.htmlCoffeepot
I am not sure I understand your question: You get the log message from doOnError every 3 seconds, but you don't see any Retrofit logging messages? If that is the case, are you sure that Retrofit logging is enabled? You should see some Retrofit output regardless of what happens in doOnError afterwards. You could try to remove the retry and have a look at the Throwable that's delivered to the onError in your Subscriber.Coffeepot
Finally I got it. It was my fault, the mGameInfo wasn't set correctly at creating a new game, such a shame! So the code above works! Although it still seems to me more like workaround rather than a solution, take(1), filter, retry sounds kinda messy instead setting a terminating condition for periodical emitting, but I will use this for now, thanks a lot!Katy
I'm glad it works now! I think you won't get around the combination of filter (to identify valid new game states) and take(1) (to stop polling after the first one). As I said, I am also not quite comfortable with the retry, mainly because I think it will trigger a new network request immediately after the last one failed - did you check for that?Coffeepot
No, it actually keeps the intervals. In my (trivial) case this wouldn't be a problem though, I just need repeated emission.Katy

© 2022 - 2024 — McMap. All rights reserved.