While subscribing to a Reactive Extensions Flowable
stream, I noticed the stream halts/hangs (no more future items are emitted, and no error is returned) after 128 items have been returned.
val download: Flowable<DownloadedRecord> = sensor.downloadRecords()
download
.doOnComplete { Log.i( "TEST", "Finished!" ) }
.subscribe(
{ record ->
Log.i( "TEST", "Got record: ${record.record.id}; left: ${record.recordsLeft}" )
},
{ error ->
Log.i( "TEST", "Error while downloading records: $error" )
} )
Most likely, this is related to Reactive Extensions. I discovered the default buffer size of Flowable
is set to 128; unlikely to be a coincidence.
While trying to understand what is happening, I ran into the following documentation on Flowable.subscribeOn
.
If there is a
create(FlowableOnSubscribe, BackpressureStrategy)
type source up in the chain, it is recommended to haverequestOn
false to avoid same-pool deadlock because requests may pile up behind an eager/blocking emitter.
Although I do not quite understand what a same-pool deadlock is in this situation, it looks like something similar is happening to my stream.
1. What is a same-pool deadlock in Reactive Extensions? What would be a minimal code sample to recreate it (on Android)?
Currently at a loss, I tried applying .subscribeOn( Schedulers.io(), false )
before .subscribe
, without really understanding what this does, but my stream still locks up after 128 items have been emitted.
2. How could I go about debugging this issue, and how/where can it be resolved?
.window( )
in the stream which takes aFlowable.create
as a boundary indicator, used to slice up upstream items into the eventual records to be downloaded. This seems as a likely candidate for something to go wrong. Currently investigating. – Eumenides