Backpressure mechanism in Spring Web-Flux
Asked Answered
S

1

59

I'm a starter in Spring Web-Flux. I wrote a controller as follows:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

I know one of the reactive benefits is Backpressure, and it can balance the request or the response rate. I want to realize how to have backpressure mechanism in Spring Web-Flux.

Salute answered 9/9, 2018 at 13:4 Comment(4)
how to have backpressure or how to deal with it? you already have built-in mechanisms to deal with it, so do you want to find out about them?Misinform
if so, check out this pageMisinform
I didn't know that I already have it by default, so maybe how to deal with it is better. I want to know about the mechanism and its configuration in Spring Web-Flux.Salute
@Andrew Tobilko RxJava is different from Reactor which is under the hood of spring WebFluxSkiagraph
B
112

Backpressure in WebFlux

In order to understand how Backpressure works in the current implementation of the WebFlux framework, we have to recap the transport layer used by default here. As we may remember, the normal communication between browser and server (server to server communication usually the same as well) is done through the TCP connection. WebFlux also uses that transport for communication between a client and the server. Then, in order to get the meaning of the backpressure control term, we have to recap what backpressure means from the Reactive Streams specification perspective.

The basic semantics define how the transmission of stream elements is regulated through back-pressure.

So, from that statement, we may conclude that in Reactive Streams the backpressure is a mechanism that regulates the demand through the transmission (notification) of how many elements recipient can consume; And here we have a tricky point. The TCP has a bytes abstraction rather than logical elements abstraction. What we usually want by saying backpressure control is the control of the number of logical elements sent/received to/from the network. Even though the TCP has its own flow control (see the meaning here and animation there) this flow control is still for bytes rather than for logical elements.

In the current implementation of the WebFlux module, the backpressure is regulated by the transport flow control, but it does not expose the real demand of the recipient. In order to finally see the interaction flow, please see the following diagram:

enter image description here

For simplicity, the above diagram shows the communication between two microservices where the left one sends streams of data, and the right one consumes that stream. The following numbered list provides a brief explanation of that diagram:

  1. This is the WebFlux framework that takes proper care for conversion of logical elements to bytes and back and transferring/receiving them to/from the TCP (network).
  2. This is the starting of long-running processing of the element which requests for next elements once the job is completed.
  3. Here, while there is no demand from the business logic, the WebFlux enqueue bytes that come from the network without their acknowledgment (there is no demand from the business logic).
  4. Because of the nature of TCP flow control, Service A may still send data to the network.

As we may notice from the diagram above, the demand exposed by the recipient is different from the demand of the sender (demand here in logical elements). It means that the demand of both is isolated and works only for WebFlux <-> Business logic (Service) interaction and exposes less the backpressure for Service A <-> Service B interaction. All that means that the backpressure control is not that fair in WebFlux as we expect.

All that means that the backpressure control is not that fair in WebFlux as we expect.

But I still want to know how to control backpressure

If we still want to have an unfair control of backpressure in WebFlux, we may do that with the support of Project Reactor operators such as limitRate(). The following example shows how we may use that operator:

@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
    
    return tweetService.process(tweetsFlux.limitRate(10))
                       .then();
}

As we may see from the example, limitRate() operator allows defining the number of elements to be prefetched at once. That means that even if the final subscriber requests Long.MAX_VALUE elements, the limitRate operator split that demand into chunks and does not allow to consume more than that at once. The same we may do with elements sending process:

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
    
    return tweetService.retreiveAll()
                       .limitRate(10);
}

The above example shows that even if WebFlux requests more then 10 elements at a time, the limitRate() throttles the demand to the prefetch size and prevents to consume more than the specified number of elements at once.

Another option is to implement own Subscriber or extend the BaseSubscriber from Project Reactor. For instance, The following is a naive example of how we may do that:

class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

    int consumed;
    final int limit = 5;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }
    
    @Override
    protected void hookOnNext(T value) {
        // do business logic there 

        consumed++;
        
        if (consumed == limit) {
            consumed = 0;
            
            request(limit);
        }
    }
}

Fair backpressure with RSocket Protocol

In order to achieve logical-elements backpressure through the network boundaries, we need an appropriate protocol for that. Fortunately, there is one called RScoket protocol. RSocket is an application-level protocol that allows transferring real demand through the network boundaries. There is an RSocket-Java implementation of that protocol that allows to set up an RSocket server. In the case of a server to server communication, the same RSocket-Java library provides a client implementation as well. To learn more how to use RSocket-Java, please see the following examples here. For browser-server communication, there is an RSocket-JS implementation which allows wiring the streaming communication between browser and server through WebSocket.

Known frameworks on top of RSocket

Nowadays there are a few frameworks, built on top of the RSocket protocol.

Proteus

One of the frameworks is a Proteus project which offers full-fledged microservices built on top of RSocket. Also, Proteus is well integrated with Spring framework so now we may achieve a fair backpressure control (see examples there)

Further readings

Besides answered 9/9, 2018 at 13:55 Comment(19)
Even though the answer is really good, I think OP is asking about programmatic (more high-level, API-related) approaches to deal with backpressure.Misinform
Let me add it. Just a momentBesides
@OlehDokuka Thanks for your good explanation. Could you say to me it's a common way to extend BaseSubscriber or Spring Web-Flux's default behavior is better?Salute
It is better to use the framework control. However, you may do some throttling with limitRate, and only in case if you need fine-grained control, you may extend BaseSubscriber. In many cases, it depends on what you really want to achieve. I would suggest you run a conversation at Reactor gitter chat room and have a normal chat there -> gitter.im/reactor/reactorBesides
This is a starting of long-running processing of the element which requests for next elements once the job is completed. Could you rephrase it please? Which job are you speaking about?Skiagraph
Why service B requests 1 elements but service A gets request of 10 elements ?Skiagraph
What is the meaninf of acknowledge messages on the diagram? Why does onle service A aknowledges service B ?Skiagraph
Also I can't understand whether it possible to force producer and consumer work in parrallel? According the text I understood that consumer handled chunk of elements and only after it requests the next element. So at one time works only whether consumer or producer or neither of themSkiagraph
@Skiagraph > This is a starting of long-running processing of the element which requests for next elements once the job is completed. it basically means that there is some generic work that a generic service has to do. The next request to a Publisher could appear from the service (or a Subscriber) only when the generic work has completed. To summarize - job in this context means some generic works (nothing certain)Besides
@Skiagraph >Why service B requests 1 elements but service A gets request of 10 elements As I mentioned, there is no proper request coordination in webflux. All backpressure happens over TCP flow control (brianstorti.com/tcp-flow-control and youtube.com/watch?v=EHaSQBOrYDI). Which means that reactor-netty has its own buffers, underlying TCP has its own acknowledgment mechanisms so after all, it is possible to write 65K of bytes so it could be 10k of logical messages but in fact, our recipient may spend dozens of time to process at least oneBesides
@Skiagraph > What is the meaninf of acknowledge messages on the diagram? Why does onle service A aknowledges service B see TCP flow control mechanism -> brianstorti.com/tcp-flow-controlBesides
@Skiagraph >Also I can't understand whether it possible to force producer and consumer work in parrallel? According the text I understood that consumer handled chunk of elements and only after it requests the next element. So at one time works only whether consumer or producer or neither of them They are working in parallel. All reactive streams are about non-blocking, asynchronous elements processingBesides
All reactive streams are about non-blocking, asynchronous elements processing but it contradicts schema you provided.Skiagraph
The schema mentioned above is a kind of sequence diagram, which does not pretend to represent asynchronous/non-blocking behavior but exposes how events appear over time.Besides
@Oleh Consumer requested 10 elements and said to WebFlux to notify after response. So Consumer won't work until notification. How can it be in parallel?Skiagraph
The consumer will not work until it gets the first element (nor blocking the thread nor spinning CPU -> reactivemanifesto.org/glossary#Non-Blocking). Since Stream means that elements could be emitted one by one, it means that once the producer can produce at least one item, the consumer will receive it and start processing it. While the processing on the consumer side is happening the producer is free to produce a few more, since it is clear that ten elements were requested and the consumer has enough capacity to handle 10.Besides
How do you handle the server-side limit that was handled by libraries like Hystrix in the past? @OlehDokuka I was trying an approach like this one github.com/vladimir-bukhtoyarov/bucket4j/discussions/148Bilection
I start to realize that webflux did nothing about backpressure controll when transfering data to the client. All are done by the Transport Layer, is that right ?Grivet
@GuofengPan correct. look at rsocket.io if you need real backpressure over the network. docs.spring.io/spring-framework/docs/current/reference/html/…Besides

© 2022 - 2024 — McMap. All rights reserved.