Error with Flowable's onErrorResumeNext, networkOnMainThread
Asked Answered
V

2

11

I have the following rxJava chain:

 override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
        var combinedFlowable = Flowable
                .combineLatest(
                        req,
                        getLastLocation().lastOrError().toFlowable(),
                        BiFunction<Place, Location, Place> { t1, location ->
                            Timber.w("FIRSTINIT - Retrieved location $location")
                            var placeLocation = Location(t1.placeName)
                            placeLocation.latitude = t1.latitude
                            placeLocation.longitude = t1.longitude
                            t1.distance = location.distanceTo(placeLocation)
                            t1
                        })


        return combinedFlowable
                .onErrorResumeNext { t: Throwable ->
                    Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
                    req
                }
                .doOnError {
                    Timber.w("FIRSTINIT - did detect the error here...")
                }

        return combinedFlowable
    }

In short, I am retrieving some data from the local database (a place) and I want to combine it with the latest location from the GPS:

 override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
        var lastLocation = locationProvider.lastKnownLocation
                .doOnNext {
                    Timber.w("Got location $it from last one")
                }
                .doOnComplete {
                    Timber.w("did i get a location?")
                }

        if (requestIfEmpty) {
            Timber.w("Switching to request of location")
            lastLocation = lastLocation.switchIfEmpty(requestLocation())
        }

        return lastLocation.doOnNext {
            Timber.w("Got something!")
            location = it
        }


    }

But I want to account for the scneario where the user does not have a last location, and thus the line:

return combinedFlowable
                    .onErrorResumeNext { t: Throwable ->
                        Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
                        req
                    }
                    .doOnError {
                        Timber.w("FIRSTINIT - did detect the error here...")
                    }

Which is trying to catch any error and retry just with the original request without combining it with anything. I am calling this code like this:

fun getPlace(placeId: String) {
        locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
                .onErrorResumeNext { t: Throwable ->
                    Timber.e(t, "Error resuming next! ")
                    placesRepository.getPlace(placeId)
                }.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
                .subscribeBy(
                        onNext = {
                            place.value = Result.success(it)
                        },
                        onError = {
                            Timber.e("ERROR! $it")
                            place.value = Result.failure(it)
                        }
                )
                .addTo(disposables)

    }

However, when there is no location a NoSuchElementException is thrown, my flowable switches to the original request, and then upon executing it I get a NetworkOnMainThread exception. Shouldn't this request be executed on the scheduler.io() that I put in there (since I put the code before that)?

In case you are wondering, schedulerProvider.io() translates to:

Schedulers.io()

GetPlace:

  /**
     * Retrieves a single place from database
     */
      override fun getPlace(id: String): Flowable<Place> {
        return Flowable.merge(placesDao.getPlace(id),
                refreshPlace(id).toFlowable())
    }

    /**
     * Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
     * itself indirectly (e.g. an experience)
     */
    private fun refreshPlace(id: String): Single<Place> {
        return from(placesApi.getPlace(id))
                .doOnSuccess {
                    placesDao.savePlace(it)
                }
    }
Vada answered 19/7, 2018 at 11:43 Comment(4)
onErrorResumeNext subscribes to the fallback on the thread which signaled the error. You should apply req.subscribeOn(Schedulers.io()) to make sure the side-effects there happen on the right thread. As for how far an observeOn or subscribeOn reaches depends on what operators are involved and is a non-trivial task to guess correctly. Therefore, always apply subscribeOn and/or observeOn when you are in doubt.Academy
but i am already doing that...Vada
Which is the original request you talk about, req? I don't see any req.subscribeOn. Also if you have an error, why didn't you post the stacktrace which could shed some light about where the problem may be?Academy
you are right, appending subscribeOn(schedulerProvider.io()) to the original request fixed the issue. Thanks!Vada
S
4

To ensure your networking happens off of the main thread send it to a background thread explicitly.

Use either the IO, New Thread or Computation schedulers from the Rx Schedulers class:

subscribeOn(Schedulers.computation())

For cases when you don't want to do this (or if you believe that it should already be on a background thread and just want to debug), you can log thread information as follows:

Thread.currentThread().getName()

This can be particularly useful for tracking what is going on when you are using either Schedulers.trampoline() of if you have different schedulers for observe and subscribe like in your example:

.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
Stubbs answered 25/7, 2018 at 13:41 Comment(0)
E
4

You cant use onErrorResumeNext to implement this funcionality. You should use retryWhen operator.

Maybe this post is usefull for you.

https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a

this code poll a server with backoff until receives a code different from 204. Maybe you can adapt to your needs using retryWhen instead of repeatWhen.

  fun pollServerWithBackoff(videoId: String, maxAttempts: Int, delay: Int): Flowable<Response<ResponseBody>> {
        return api.download(videoId)
                .subscribeOn(Schedulers.io())
                .repeatWhen {
                    it
                            .zipWith(Flowable.range(1, maxAttempts),
                                    BiFunction { _: Any?, attempt: Int -> attempt })
                            .flatMap {

                                Flowable.timer((it * delay).toLong(), TimeUnit.SECONDS);
                            }

                }
                .takeUntil({

                    it.code() != 204
                })
                .filter {

                    it.code() != 204
                }
                .map{
                    if(it.code() in 200..300)
                        it
                    else
                        throw IOException(it.errorBody()?.toString() ?: "Unkown Error")
                }
    }
Euridice answered 25/7, 2018 at 14:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.