RxJava: How to refresh a token when multiple requests are sent at the same time?
Asked Answered
M

1

6

I have an app that authenticates using OAuth2 and fetches data from a RESTful service using Retrofit. Now, I have the token retrieval and refreshing up and running. The token is refreshed like so (schedulers omitted):

// Each Retrofit call observable is "wrapper" using this method
protected <T> Observable<T> wrap(@NonNull final Observable<T> page) {
    return authenticate()
        .concatMap(token -> page)
        .onErrorResumeNext(throwable -> {
            Log.w(TAG, "wrap: ErrorResumeNext", throwable);
            return refreshAccessToken()
                .flatMap(accessToken -> page);
        }));
}

// Retrieves the access token if necessary
Observable<AccessToken> authenticate() {
    // Already have token
    if(accessToken != null) return Observable.just(accessToken);
    // No token yet, fetch it
    return api.getAccessToken(...);
}

// Refreshes the token
Observable<AccessToken> refreshAccessToken() {
    return api.refreshToken(...);
}

This works, but in some cases, multiple requests are sent at once and they both invoke the refreshing process - basically my app ends up refreshing the token as many times as there were requests at the moment.

So, the question is: How do I ensure that when the token needs to be refreshed, it is done only once, no matter how many ongoing requests require the token to be refreshed? Can I somehow make the other requests "wait" until the first request sucessfully invoked and retrieved the new token?

Mikol answered 24/1, 2017 at 14:19 Comment(0)
G
4

We have accomplished this behavior using a hot observable for refreshing the token and providing access to its instance for all requests that failed to authenticate.

Use share operator to turn your basic cold observable for refreshing a token into a hot one, so every other subscriber shares its result. Once the request comes back, all awaiting observers gets notifies and in that moment (in the operator chain it comes right before share() into a callback for doOnUnsubscribe) destroy the refreshing observable instance so the next subscriber will create new one. All this can be easily achieved by a singleton pattern, where you wrap the refreshing observable into a singleton wrapper class and just ask for it through getInstance(). If there is no request going on -- the instance is null -- getInstance should create a new one.

There are some other things you need to take care of, error during refreshing and invalidating the token all together for example, but these are the basics.

I don't have much time right now to elaborate more on this, but if you will encounter some trouble implementing this by your own, leave a comment and I will post some code examples by tomorrow. They wouldn't make much sense without a context.

Goudy answered 24/1, 2017 at 15:12 Comment(1)
This seems to do the trick! I had this at one point, but it didn't work back then. I probably had something else wrong, because now it works just as intended.Mikol

© 2022 - 2024 — McMap. All rights reserved.