RxJava error handling in chain API calls
Asked Answered
D

2

5

I have playing with Rxjava recently trying to implement a chain of events(Api callas/Database operations) and seem to have hit a roadblock when it comes to handling errors.

This is what I am trying to do. I am calling an Api that will check if user exists in the database. Based on the response I get, I am trying to chain a few sequences using rxjava. Following diagram might explain a little better.

                          checkUser()
                         /          \
                       No           Yes
                       /              \
            createUserRemote()       FetchUserNotesRemote()
                      |                    |
                    End               SaveUserNotesLocal()
                                            |
                                           End

I am able to chain together checkUser() -> FetchUserNotesRemote() -> SaveUserNotesLocal() sequence with the following code.

checkUser()
            .flatMap(id -> {return fetchData(id);})
            .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });

The issue I am mainly trying to solve.

  • I can't figure out how to handle a case where checkUser() returns
    a 404 http status. Because when that happens, subscriber's onError
    method gets called which seems to me is what should happen. How can I handle it so that when I get an error (404) response from API, instead of executing FetchUserNotesRemote() and SaveUserNotesLocal(), I execute a different chain of events?
  • Another thing I am not sure about is, if there is an error called on any of the observables in a chain, how does the subscriber's onError method know which observable called it?
Darra answered 4/4, 2018 at 5:2 Comment(0)
E
12

1) To execute different chain of observables on error you can use method onErorrResumeNext(). More info here: github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

Example:

checkUser().flatMap(id -> {return fetchData(id);})
           .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
           .onErrorResumeNext(throwable -> { return doSomethingDifferent(); }
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });

2) If the exception is thrown somewhere in your stream, it is passed down to subscriber onError(). If you want to know at which part of stream error was thrown, you can add multiple onErorrResumeNext() calls, that throw concrete exception after each api call.

    checkUser()
           .onErrorResumeNext(throwable -> { return Observable.error(new CheckUserException()); }
           .flatMap(id -> {return fetchData(id);})
           .onErrorResumeNext(throwable -> { return Observable.error(new FetchDataException()); }
           .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
           .onErrorResumeNext(throwable -> { return Observable.error(new SaveDataException()); }
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });
Edholm answered 4/4, 2018 at 6:47 Comment(4)
Thank you for your response. I will try this out and let you know.Darra
Hi @noob did you manage to get this working? I am also facing similar situation, the onErrorResumeNext method does not get called when the API timeout for me could you post the solution.Baedeker
@Baedeker I posted an answer.Darra
@Subayyal somehow onErrorResumeNext() is not called when I add it in the chain request.Baedeker
D
0

I completely forgot about this. But @mol pushed me in the right direction. My solution was a bit different. This may not be the best solution but it worked for me at the time.

I first created my own custom exception classes like following.

public class CreateUserLocalException extends Exception {
    public CreateUserLocalException(String message) {
        super(message);
    }
}

Then in my checkUser() function I throw exception of type I created above like following.

public Single<String> checkUser(String id) {
    return Single.create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(SingleEmitter<String> emitter) throws Exception {
            try {
                GetUserResponseObject getUserResponseObject = apiClient.usersIdGet(id);
                Log.d("Test", "checkUserCall: Status: " + getUserResponseObject.getStatus());
                emitter.onSuccess(getUserResponseObject.getBody().getUserId());
            } catch (AmazonServiceException e) {
                Log.d("Test", "AmazonServiceException : " + e.getErrorMessage());
                e.printStackTrace();
                if (e.getErrorMessage().equals("timeout")) {
                    throw new SocketTimeoutException();
                } else {
                    throw new CheckUserException(Integer.toString(e.getStatusCode()));
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new CheckUserException(Integer.toString(AppConstants.ERROR));
            }
        }
    });
}

Then in my chain of calls, in the event of an error, onError(throwable) gets invoked where I am checking the instanceof Exception to identify what kind of exception occurred. Below is the code for chain of functions.

cloudSyncHelper.checkUser(user.getUser_id())
        .retry(3, new Predicate<Throwable>() {
            @Override
            public boolean test(Throwable throwable) throws Exception {
                Log.d("Test", throwable.toString());
                if (throwable instanceof SocketTimeoutException) {
                    Log.d("Test", "Time out.. Retrying..");
                    return true;
                }
                return false;
            }
        })
        .flatMap(s -> {
            return cloudSyncHelper.createUserLocal(user)
                    .onErrorResumeNext(throwable -> {
                        Log.d("Test", "onErrorResumeNext, throwable message: " + throwable.getMessage());
                        if (throwable instanceof CreateUserLocalException) {
                            if (Integer.parseInt(throwable.getMessage()) == AppConstants.LOCAL_DB_DUPLICATE) {
                                return Single.just(user.getUser_id());
                            }
                        }
                        return Single.error(new CreateUserLocalException(Integer.toString(AppConstants.LOCAL_DB_ERROR)));
                    });
        })
        .flatMap(id -> {
            return cloudSyncHelper.fetchData(id)
                    .retry(3, new Predicate<Throwable>() {
                        @Override
                        public boolean test(Throwable throwable) throws Exception {
                            Log.d("Test", throwable.toString());
                            if (throwable instanceof SocketTimeoutException) {
                                Log.d("Test", "Time out.. Retrying..");
                                return true;
                            }
                            return false;
                        }
                    });
        })
        .flatMap(notesResponseObject -> {
            return cloudSyncHelper.saveFetchedData(notesResponseObject);
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onSuccess(Integer integer) {
                //handle onsuccess here
                googleSignInButton.setEnabled(true);
                progressBar.setVisibility(View.GONE);
                Log.d("Test", "onSuccess Called");
                getSharedPreferences(AppConstants.AppName, MODE_PRIVATE).edit().putBoolean("isFirstRun", false).apply();
                startActivity(new Intent(LoginScreen.this, HomeScreen.class));
            }

            @Override
            public void onError(Throwable e) {

                if (e instanceof SocketTimeoutException) {
                    googleSignInButton.setEnabled(true);
                    progressBar.setVisibility(View.GONE);
                    Log.d("Test", "Socket Time Out");
                    Utils.createToast(LoginScreen.this, "Socket timed out");
                    return;
                }

                int code = Integer.parseInt(e.getMessage());
                Log.d("Test", "onError Called");
                if (e instanceof CheckUserException) {
                    Log.d("Test", "onError CheckUserException");
                    if (code == AppConstants.NOTFOUND) {
                        newUserSequence(user);
                    } else {
                        googleSignInButton.setEnabled(true);
                        progressBar.setVisibility(View.GONE);
                        Utils.createToast(LoginScreen.this, "Unable to user information from cloud. Try again.");
                    }
                }
                if (e instanceof CreateUserLocalException) {
                    Log.d("Test", "onError CreateUserLocalException");
                    googleSignInButton.setEnabled(true);
                    progressBar.setVisibility(View.GONE);
                }
                if (e instanceof FetchDataException) {
                    Log.d("Test", "onError FetchDataException");
                    if (code == AppConstants.NOTFOUND) {
                        googleSignInButton.setEnabled(true);
                        progressBar.setVisibility(View.GONE);
                        getSharedPreferences(AppConstants.AppName, MODE_PRIVATE).edit().putBoolean("isFirstRun", false).apply();
                        startActivity(new Intent(LoginScreen.this, HomeScreen.class));
                    } else {
                        googleSignInButton.setEnabled(true);
                        progressBar.setVisibility(View.GONE);
                        Log.d("Test", "Unable to fetch data from cloud");
                        Utils.createToast(LoginScreen.this, "Unable to fetch data from cloud. Try again.");
                    }
                }
                if (e instanceof SaveDataLocalException) {
                    googleSignInButton.setEnabled(true);
                    progressBar.setVisibility(View.GONE);
                    Log.d("Test", "onError SaveDataLocalException");
                    if (code == AppConstants.LOCAL_DB_ERROR) {
                        Log.d("Test", "Unable to save data fetched from cloud");
                        Utils.createToast(LoginScreen.this, "Unable to save data fetched from cloud");
                    } else {
                        Utils.createToast(LoginScreen.this, "Unable to save data fetched from cloud");
                    }
                }
            }
        });

Hope this helps.

Darra answered 24/9, 2019 at 23:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.