is there a straightforward way to implement something like this
Depends on your measure of straightforward. Here is how I would do it.
Backpressure translates to programmatical suspension and resumption in the coroutines world. For onBackpressureDrop
, the downstream has to indicate it is ready for one item and suspend for it while the upstream should never wait for the downstream to be ready.
You have to consume the upstream in an unbounded manner and hand over items and terminal events to the downstream waiting for those signals.
package hu.akarnokd.kotlin.flow.impl
import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
: AbstractFlow<T>() {
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val consumerReady = AtomicBoolean()
val producerReady = Resumable()
val value = AtomicReference<T>()
val done = AtomicBoolean()
val error = AtomicReference<Throwable>();
launch {
try {
source.collect {
if (consumerReady.get()) {
value.set(it);
consumerReady.set(false);
producerReady.resume();
}
}
done.set(true)
} catch (ex: Throwable) {
error.set(ex)
}
producerReady.resume()
}
while (true) {
consumerReady.set(true)
producerReady.await()
val d = done.get()
val ex = error.get()
val v = value.getAndSet(null)
if (ex != null) {
throw ex;
}
if (d) {
break;
}
collector.emit(v)
}
}
}
}
Note: Resumable implementation.
So let's walk through the implementation.
First, one needs 5 variables to hand over information between the collector of the upstream and the collector working for the downstream:
- consumerReady
indicates the downstream is ready for the next item,
- producerReady
indicates the producer has stored the next item (or terminal signal) and the downstream can resume
- value
the upstream item ready for consumption
- done
the upstream has ended
- error
the upstream has failed
Next, we have to launch the collector for the upstream because collect is suspending and would not let the downstream consumer loop run at all until completion. In this collector, we check if the downstream consumer is ready (via consumerReady
) and if so, store the current item, clear the readiness flag and signal its availability via producerReady
. Clearing the consumerReady
will prevent subsequent upstream items to be stored until the downstream itself indicates a new readiness.
When the upstream ends or crashes, we set the done
or error
variables and indicate the producer has spoken.
After the launch { }
part, the we'll now keep consuming the shared variables on behalf of the downstream collector.
The first thing in each round is to indicate we are ready for the next value, then wait for the producer side signal it has placed the next event in the shared variable(s).
Next, we collect values from these variables. We are eager the complete or throw an error, and only as a last resort re-emit the upstream item to the downstream collector.