If it safe to have blocking operation inside flatMap { ... } mapper function?
Asked Answered
A

1

1

I'd like to organize a thread barrier: given a single lock object, any thread can obtain it and continue thread's chain further, but any other thread will stay dormant on the same lock object until the first thread finishes and releases the lock.

Let's express my intention in code (log() simply prints string in a log):

val mutex = Semaphore(1)  // number of permits is 1

source
.subscribeOn(Schedulers.newThread())  // any unbound scheduler (io, newThread)
.flatMap {
    log("#1")
    mutex.acquireUninterruptibly()
    log("#2")
    innerSource
       .doOnSubscribe(log("#3"))
       .doFinally {
          mutex.release()
          log("#4")
       }
}
.subscribe()

It actually works well, i can see how multiple threads show log "#1" and only one of them propagates further, obtaining lock object mutex, then it releases it and i can see other logs, and next threads comes into play. OK

But sometimes, when pressure is quite high and number of threads is greater, say 4-5, i experience DEADLOCK:

Actually, the thread that has acquired the lock, prints "#1" and "#2" but it then never print "#3" (so doOnSubscribe() not called), so it actually stops and does nothing, not subscribing to innerSource in flatMap. So all threads are blocked and app is not responsive at all.

My question - is it safe to have blocking operation inside flatMap? I dig into flatMap source code and i see the place where it internally subscribes:

if (!isDisposed()) {
    o.subscribe(new FlatMapSingleObserver<R>(this, downstream));
}

Is it possible that thread's subscription, that has acquired lock, was disposed somehow?

Alanealanine answered 6/9, 2019 at 8:6 Comment(5)
Makes no sense given that Flowable has backpressure and you can limit the concurrency on flatMap.Gownsman
let me clarify a bit: it's not a source that emits too quickly. It's actually many different places in different threads that can call this method, that assembles the chain with source->**flatMap**->**innerSource** and subscribes on it.Alanealanine
what is the use case for this over using making it single threaded, regradless of calling thread i.e val scheduler = Schedulers.from(Executors.newSingleThreadExecutor()) .... source.subscribeOn(scheduler)....Kristof
yes, probably subscribeOn() directive is excessiveAlanealanine
I put it here because the enclosing function, that is called from different threads, could be also called from threads that reside in Schedulers.computation() pool, and since this function involves some I/O operations, it's unsafe, (possible same-pool deadlock, see more here: [link] (#54117179)). I just wanted to exclude same-pool deadlock from the possible causes. Or maybe i'm wrong i still have same-pool deadlockAlanealanine
P
0

You can use flatMap second parameter maxConcurrency and set it to 1, so it does what you want without manually locking

Pyoid answered 7/9, 2019 at 10:18 Comment(3)
SingleFlatMap does not support maxConcurrency since there is only one upstream source. I'm not using a hot source to emit some values that is then subscribed on and used concurrently. I have some function that is called from different threads, that function operates with Singles and does some work, emits single value and then either continues innerSource or blocks waiting for the lock.Alanealanine
Ok, instead of creating Single for each caller, can you create Processor and make it hot observable, so that all callers use the same stream?Pyoid
I'm not sure i can, because there is some annoyance: let's say the code above, except of subscribe() call is a body of some method, say triggerSource(). This is not called standalone, but it's part of the chain: ``` triggerSource() .flatMap { otherSource } .subscribe() ``` each thread has some work to do, and it should preliminary call triggerSource() and wait for it's results, then proceeds the chain. So, the main job requires triggerSource() to be done, and it may block until another thread finishes it's triggerSource() part (and proceed it's own chain further).Alanealanine

© 2022 - 2024 — McMap. All rights reserved.