I'm relatively new to reactive programming and Reactor. I have a situation in which I want to bufferTimeout
values in my stream while keeping it under my control (no unbounded request), so I can manually request batches of values.
The following sample illustrates it:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
Flux<Object> flux = Flux.generate(sink -> {
try {
sink.next(queue.poll(10, TimeUnit.DAYS));
}
catch (InterruptedException e) {}
});
BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
protected void hookOnSubscribe(Subscription subscription) {
// Don't request unbounded
}
protected void hookOnNext(List<Object> value) {
System.out.println(value);
}
};
flux.subscribeOn(parallel())
.log()
.bufferTimeout(10, ofMillis(200))
.subscribe(subscriber);
subscriber.request(1);
// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);
// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500);
// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);
This is the output:
[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
I actually expected it because I understand the buffer subscriber will request 10 values upstream, which is unaware of the timeout and will produce all of them regardless. As the one-and-only request was completed once the timeout finished, the values offered later are still produced and overflow.
I wonder if it's possible to prevent the remaining values from being produced after the timeout finished, or buffer them without losing control. I have tried:
limitRate(1)
beforebufferTimeout
, trying to make the buffer request values "on demand". It does request one-by-one, but 10 times, because the buffer asked for 10 values.onBackpressureBuffer(10)
, as the problem is basically the definition to backpressure if I got it right. Trying to buffer the overflowing values from the timed-out request, but this requests unbounded values, which I'd like to avoid.
Looks like I'll have to implement another bufferTimeout
implementation, but I'm told that writing publishers is hard. Am I missing something? Or am I doing reactive wrong?