Kotlin Flow onBackpressureDrop RxJava2 analog
Asked Answered
P

3

5

In RxJava 2 Flowable there are different backpressure strategies, among them the most interesting are:

  • LATEST
  • BUFFER
  • DROP

which are respected throughout whole Rx chain.

In Kotlin there is Flow, which declares that it has backpressure support out-of-the-box. I was able to make Flow to have BUFFER and LATEST strategies by using the following:

For BUFFER:

observeFlow()
    .buffer(10)
    .collect { ... }

With LATEST:

observeFlow()
    .conflate()
    .collect { ... }

Which is just shortcut on that same buffer operator.

But I was not able to find anything that can work same as DROP. In short DROP will drop any value which comes in the stream when previous value hasn't been processed yet. And with Flow I even not sure that it is possible at all.

Considering the case:

observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }

So backpressureDrop should respect any work which is done below in the stream, while that operator don't know anything about what is happening below (without explicit callback from the bottom - like "request" method in RxJava Subscriber). Therefore it seems not that possible. And that operator should not pass through any event before previous item was collected.

Is there any ready operator, which I miss, or is there a straightforward way to implement something like this with existing API?

Porphyrin answered 25/1, 2020 at 15:24 Comment(0)
D
3

We can build this using a Flow backed by a Rendezvous Channel.

When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

A Rendezvous channel has no buffer. Therefore, consumers of this channel are required to be suspended and waiting for the next element in order for an element to be sent to this channel. We can exploit this quality to drop values that can't be accepted without the channel suspending using Channel.offer, which is a normal non-suspending function.

Channel.offer

Adds element into this queue if it is possible to do so immediately without violating capacity restrictions and returns true. Otherwise, it returns false immediately or throws exception if the channel isClosedForSend (see close for details).

Because channelFlow is buffered, we need to apply Flow<T>.buffer downstream to 0.

/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

Here's an example of how a slow consumer can use this to drop elements.

fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

with the corresponding output:

0
11
21
31
41
51
61
71
81
91
Denounce answered 8/2, 2020 at 17:42 Comment(0)
R
3

is there a straightforward way to implement something like this

Depends on your measure of straightforward. Here is how I would do it.

Backpressure translates to programmatical suspension and resumption in the coroutines world. For onBackpressureDrop, the downstream has to indicate it is ready for one item and suspend for it while the upstream should never wait for the downstream to be ready.

You have to consume the upstream in an unbounded manner and hand over items and terminal events to the downstream waiting for those signals.

package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
 : AbstractFlow<T>() {
    @ExperimentalCoroutinesApi
    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val consumerReady = AtomicBoolean()
            val producerReady = Resumable()
            val value = AtomicReference<T>()
            val done = AtomicBoolean()
            val error = AtomicReference<Throwable>();

            launch {
                try {
                    source.collect {
                        if (consumerReady.get()) {
                            value.set(it);
                            consumerReady.set(false);
                            producerReady.resume();
                        }
                    }
                    done.set(true)
                } catch (ex: Throwable) {
                    error.set(ex)
                }
                producerReady.resume()
            }

            while (true) {
                consumerReady.set(true)
                producerReady.await()

                val d = done.get()
                val ex = error.get()
                val v = value.getAndSet(null)

                if (ex != null) {
                    throw ex;
                }
                if (d) {
                    break;
                }

                collector.emit(v)
            }
        }
    }
}

Note: Resumable implementation.

So let's walk through the implementation.

First, one needs 5 variables to hand over information between the collector of the upstream and the collector working for the downstream: - consumerReady indicates the downstream is ready for the next item, - producerReady indicates the producer has stored the next item (or terminal signal) and the downstream can resume - value the upstream item ready for consumption - done the upstream has ended - error the upstream has failed

Next, we have to launch the collector for the upstream because collect is suspending and would not let the downstream consumer loop run at all until completion. In this collector, we check if the downstream consumer is ready (via consumerReady) and if so, store the current item, clear the readiness flag and signal its availability via producerReady. Clearing the consumerReady will prevent subsequent upstream items to be stored until the downstream itself indicates a new readiness.

When the upstream ends or crashes, we set the done or error variables and indicate the producer has spoken.

After the launch { } part, the we'll now keep consuming the shared variables on behalf of the downstream collector.

The first thing in each round is to indicate we are ready for the next value, then wait for the producer side signal it has placed the next event in the shared variable(s).

Next, we collect values from these variables. We are eager the complete or throw an error, and only as a last resort re-emit the upstream item to the downstream collector.

Recurrence answered 26/1, 2020 at 11:28 Comment(4)
Thank you for the very detailed answer, I can say that yes the idea of your solution is pretty straightforward :) However I'd like to point to 2 things: 1) implementation uses jvm Atomics, which makes it platform specific. But here probably using channels instead to communicate between coroutines would solve the issue 2) am I right that this is solution for collect with drop and it is not applied to other operators in the chain? (Will proceed the question in next comment)Porphyrin
Consider the case where upstream emits values each second, then in the chain there is map with heavy computation, which takes 3 seconds. Then even if we add this collect with drop in the end, we still will have code inside map executed for each item emitted by source, right?Porphyrin
And the "expected" behavior would be to write something like source.backpressureDrop().map().collect {} to have map executed only for items which passed through "backpressure drop" wall. (Hopefully my examples and questions were clear and make sense)Porphyrin
1) Non-multithreaded platforms could mock out the Atomics with plain fields. 2) It is the same as with RxJava, place it where you want upstream items not reach the downstream.Recurrence
D
3

We can build this using a Flow backed by a Rendezvous Channel.

When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

A Rendezvous channel has no buffer. Therefore, consumers of this channel are required to be suspended and waiting for the next element in order for an element to be sent to this channel. We can exploit this quality to drop values that can't be accepted without the channel suspending using Channel.offer, which is a normal non-suspending function.

Channel.offer

Adds element into this queue if it is possible to do so immediately without violating capacity restrictions and returns true. Otherwise, it returns false immediately or throws exception if the channel isClosedForSend (see close for details).

Because channelFlow is buffered, we need to apply Flow<T>.buffer downstream to 0.

/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

Here's an example of how a slow consumer can use this to drop elements.

fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

with the corresponding output:

0
11
21
31
41
51
61
71
81
91
Denounce answered 8/2, 2020 at 17:42 Comment(0)
P
1

From the comment here made by Anton Spaans there is a way to emulate on drop by using channelFlow.
But the issue is that by default channelFlow builder uses BUFFER strategy and doesn't allow to parametrize capacity.
There is a way to parametrize capacity in the ChannelFlowBuilder, but the issue is that API is internal and ChannelFlowBuilder is private.
But essentially if copy-paste ChannelFlowBuilder implementation and create class like this:

class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {

    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
            .collect { collector.emit(it) }
    }
}

(or directly apply similar solution as transform).
Then it seems to work.
Main key here is to use capacity = 0, which says that downstream will be suspended on each item received (as there is no buffer capacity).

Porphyrin answered 8/2, 2020 at 8:24 Comment(2)
.buffer(capacity = 0) is not forbidden at runtime. From the docs: capacity - type/capacity of the buffer between coroutines. Allowed values are the same as in Channel(...) factory function: BUFFERED (by default), CONFLATED, RENDEZVOUS, UNLIMITED or a non-negative value indicating an explicitly requested size.Denounce
@KevinCianfarini yes, removing part about "buffer(capacity = 0)". It is not restricted. It just didn't work in the way I wanted it to use. It suspends but not drops values. I see you've trying to make edit over my answer and your solution seems to work. So I suggest you to post instead your edit as a separate answer. CheersPorphyrin

© 2022 - 2024 — McMap. All rights reserved.