Avoiding same-pool deadlocks when using Flowable in Reactive Extensions
Asked Answered
E

1

0

While subscribing to a Reactive Extensions Flowable stream, I noticed the stream halts/hangs (no more future items are emitted, and no error is returned) after 128 items have been returned.

val download: Flowable<DownloadedRecord> = sensor.downloadRecords()
download
    .doOnComplete { Log.i( "TEST", "Finished!" ) }
    .subscribe(
        { record ->
            Log.i( "TEST", "Got record: ${record.record.id}; left: ${record.recordsLeft}" )
        },
        { error ->
            Log.i( "TEST", "Error while downloading records: $error" )
        } )

Most likely, this is related to Reactive Extensions. I discovered the default buffer size of Flowable is set to 128; unlikely to be a coincidence.

While trying to understand what is happening, I ran into the following documentation on Flowable.subscribeOn.

If there is a create(FlowableOnSubscribe, BackpressureStrategy) type source up in the chain, it is recommended to have requestOn false to avoid same-pool deadlock because requests may pile up behind an eager/blocking emitter.

Although I do not quite understand what a same-pool deadlock is in this situation, it looks like something similar is happening to my stream.

1. What is a same-pool deadlock in Reactive Extensions? What would be a minimal code sample to recreate it (on Android)?

Currently at a loss, I tried applying .subscribeOn( Schedulers.io(), false ) before .subscribe, without really understanding what this does, but my stream still locks up after 128 items have been emitted.

2. How could I go about debugging this issue, and how/where can it be resolved?

Eumenides answered 9/1, 2019 at 19:2 Comment(3)
I rely on a .window( ) in the stream which takes a Flowable.create as a boundary indicator, used to slice up upstream items into the eventual records to be downloaded. This seems as a likely candidate for something to go wrong. Currently investigating.Eumenides
I increased the default buffer size to 130, after which it hangs after 130 emitted records, confirming the problem is related to this parameter.Eumenides
The root cause of my problem turned out not to be a same-pool deadlock, but rather a bug in RxJava. However, this question still makes sense, and the bugfix does not really answer it. Therefore, I will keep it around.Eumenides
A
1
  1. What is a same-pool deadlock in Reactive Extensions?

RxJava uses single threaded executors in the standard schedulers. When a blocking or eager source is emitting items, it occupies this single thread and even though the downstream requests more, subscribeOn will schedule those requests behind the currently running/blocking code that then never gets notified about the new opportunities.

What would be a minimal code sample to recreate it (on Android)?

Why would you want code that deadlocks?

I tried applying .subscribeOn( Schedulers.io(), false )

What is your actual flow? You likely applied subscribeOn too far from the source and thus it has no effect. The most reliable is to put it right next to create.

How could I go about debugging this issue, and how/where can it be resolved?

Putting doOnNext and doOnRequest at various places and see where signals disappear.

Antiphonary answered 10/1, 2019 at 8:15 Comment(2)
"Why would you want code that deadlocks?" To understand it better. :) The equivalent of a picture says more than a thousand words.Eumenides
github.com/ReactiveX/RxJava/…Antiphonary

© 2022 - 2024 — McMap. All rights reserved.