I have the following rxJava chain:
override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
var combinedFlowable = Flowable
.combineLatest(
req,
getLastLocation().lastOrError().toFlowable(),
BiFunction<Place, Location, Place> { t1, location ->
Timber.w("FIRSTINIT - Retrieved location $location")
var placeLocation = Location(t1.placeName)
placeLocation.latitude = t1.latitude
placeLocation.longitude = t1.longitude
t1.distance = location.distanceTo(placeLocation)
t1
})
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
return combinedFlowable
}
In short, I am retrieving some data from the local database (a place) and I want to combine it with the latest location from the GPS:
override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
var lastLocation = locationProvider.lastKnownLocation
.doOnNext {
Timber.w("Got location $it from last one")
}
.doOnComplete {
Timber.w("did i get a location?")
}
if (requestIfEmpty) {
Timber.w("Switching to request of location")
lastLocation = lastLocation.switchIfEmpty(requestLocation())
}
return lastLocation.doOnNext {
Timber.w("Got something!")
location = it
}
}
But I want to account for the scneario where the user does not have a last location, and thus the line:
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
Which is trying to catch any error and retry just with the original request without combining it with anything. I am calling this code like this:
fun getPlace(placeId: String) {
locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
.onErrorResumeNext { t: Throwable ->
Timber.e(t, "Error resuming next! ")
placesRepository.getPlace(placeId)
}.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
.subscribeBy(
onNext = {
place.value = Result.success(it)
},
onError = {
Timber.e("ERROR! $it")
place.value = Result.failure(it)
}
)
.addTo(disposables)
}
However, when there is no location a NoSuchElementException
is thrown, my flowable switches to the original request, and then upon executing it I get a NetworkOnMainThread
exception. Shouldn't this request be executed on the scheduler.io()
that I put in there (since I put the code before that)?
In case you are wondering, schedulerProvider.io()
translates to:
Schedulers.io()
GetPlace:
/**
* Retrieves a single place from database
*/
override fun getPlace(id: String): Flowable<Place> {
return Flowable.merge(placesDao.getPlace(id),
refreshPlace(id).toFlowable())
}
/**
* Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
* itself indirectly (e.g. an experience)
*/
private fun refreshPlace(id: String): Single<Place> {
return from(placesApi.getPlace(id))
.doOnSuccess {
placesDao.savePlace(it)
}
}
onErrorResumeNext
subscribes to the fallback on the thread which signaled the error. You should applyreq.subscribeOn(Schedulers.io())
to make sure the side-effects there happen on the right thread. As for how far anobserveOn
orsubscribeOn
reaches depends on what operators are involved and is a non-trivial task to guess correctly. Therefore, always applysubscribeOn
and/orobserveOn
when you are in doubt. – Academyreq
? I don't see anyreq.subscribeOn
. Also if you have an error, why didn't you post the stacktrace which could shed some light about where the problem may be? – Academy