Prevent flux.bufferTimeout from overflowing after timeout
Asked Answered
S

1

9

I'm relatively new to reactive programming and Reactor. I have a situation in which I want to bufferTimeout values in my stream while keeping it under my control (no unbounded request), so I can manually request batches of values.

The following sample illustrates it:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Flux<Object> flux = Flux.generate(sink -> {
    try {
        sink.next(queue.poll(10, TimeUnit.DAYS));
    }
    catch (InterruptedException e) {}
});

BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
    protected void hookOnSubscribe(Subscription subscription) {
        // Don't request unbounded
    }

    protected void hookOnNext(List<Object> value) {
        System.out.println(value);
    }
};

flux.subscribeOn(parallel())
        .log()
        .bufferTimeout(10, ofMillis(200))
        .subscribe(subscriber);

subscriber.request(1);

// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);

// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500); 

// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);

This is the output:

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

I actually expected it because I understand the buffer subscriber will request 10 values upstream, which is unaware of the timeout and will produce all of them regardless. As the one-and-only request was completed once the timeout finished, the values offered later are still produced and overflow.

I wonder if it's possible to prevent the remaining values from being produced after the timeout finished, or buffer them without losing control. I have tried:

  • limitRate(1) before bufferTimeout, trying to make the buffer request values "on demand". It does request one-by-one, but 10 times, because the buffer asked for 10 values.
  • onBackpressureBuffer(10), as the problem is basically the definition to backpressure if I got it right. Trying to buffer the overflowing values from the timed-out request, but this requests unbounded values, which I'd like to avoid.

Looks like I'll have to implement another bufferTimeout implementation, but I'm told that writing publishers is hard. Am I missing something? Or am I doing reactive wrong?

Siva answered 11/1, 2019 at 17:32 Comment(6)
I'm wondering whether it's the simulation of the producer (Flux.generate) that is causing a confusion here. What would your real producer look like? Are you really generating data from your code or actually getting it from an external source like a database?Determination
The source is Kafka and I'm using reactor-kafka to connect to it. With this lib, data is fetched periodically from Kafka. I can reproduce this issue by setting a long fetch period and manually sending one message, wait for the timeout, then send another message.Siva
Is the example I gave bad? Should I just put the real case in the question instead?Siva
The example isn't bad. It's just not clear where the data for the flux comes from. Are you generating the elements completely locally, or getting them from somewhere else? Seeing the real case might help.Determination
I have tested locally and remote, with both Kafka mocks and a real Kafka. As I could show with the example, even without Kafka I can reproduce the issue consistently. Looks like I have to improve how I handle the stream.Siva
If you data is coming from Kafka, look at the reactor-kafka library source code to get an idea of how to read from Kafka as a flux.Determination
S
2

Solved it by implementing my own subscriber:

https://gist.github.com/hossomi/5edf60acb534a16c025e12e4e803d014

It requests only the needed amount of values and buffers those received while there's no active request. The buffer is unbounded, so might want to use with caution or change it.

Most likely not as reliable as a standard Reactor subscriber, but works for me. Suggestions are welcome!

Siva answered 18/1, 2019 at 13:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.