Spring WebFlux 5.3.0 - WebClient.exchangeToMono()
Asked Answered
L

3

21

I've just upgraded to Webflux 5.3.0, and noticed that WebClient.exchange() method is now deprecated (link) in favor of new methos .exchangeToMono() and .exchangeToFlux()

I had this code:

webClient
   .method(request.method)
   .uri(request.path)
   .body(request.bodyToMono<ByteArray>())
   .exchange()
   .flatMap { response ->
      ServerResponse.
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToMono<ByteArray>())
   }

I had to refactor it into this:

   .exchangeToMono { response ->
      ServerResponse.
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToMono<ByteArray>())
   }

However, apparently .exchangeToMono() calls .releaseIfNotConsumed(), which releases the unprocessed response body, and basically makes the server return an empty body

So I had to refactor my code further:

   .exchangeToMono { response ->
      response.bodyToMono<ByteArray>()
         .defaultIfEmpty(ByteArray(0))
         .flatMap { body ->
            ServerResponse.
               .status(response.statusCode())
               .headers { it.addAll(response.headers().asHttpHeaders()) }
               .bodyValue(body)
         }
   }

As far as I understand, .exchange() allows my proxy server to transmit the response body without actually processing it, while .exchangeToMono() forces me to process (buffer?) it. Is this correct?

If so, what are the implications? Should I be okay with the change, or should I tweak the code somehow in order to make it transmit the response body without processing it? How would I do that?

==========

tl;dr What is the practical difference between passing .body(response.bodyToMono()) and .bodyValue(body)?

Lowman answered 2/11, 2020 at 17:59 Comment(1)
created an issue github.com/spring-projects/spring-framework/issues/26023Lowman
T
42

After reading through the change and trying to understand your questions I'm going to give a go at answering this. I'm not in any way sure this is the correct answer and I'm going to make some logical assumptions based on what I know about reactor, webflux and webclient.

Ever since WebClient was released the main workhorse was supposed to be retrieve() to be able to provide a simple but stable API against a fully asynchronous webclient.

The problem was that most people were used to work with the ResponseEntities returned by the old deprecated RestTemplate so ppl instead turned to using the exchange() function instead.

But it's here the problem lies. When you gain access to the Response you also have a responsibility attached to it. You are obligated to consume the response so that the server can close the TCP connection. This usually means that you need to read the header and the body and then we can close the connection.

If you don't consume the response you will have an open connection, with a resulting memory leak.

Spring solved this by providing functions like response#bodyToMono and response#bodyToFlux which consume the body and then after closes the response (which in turn closes the connection, thus consuming the response).

But it turns out it was quite easy (since developers are crafty bastards) for people to write code that didn't consume the response hence giving dangling TCP connections.

webclient.url( ... )
    .exchange(response -> {

        // This is just an example but, but here i just re-return the response
        // which means that the server will keep the connection open, until i 
        // actually consume the body. I could send this all over my application
        // but never consume it and the server will keep the connection open as
        // long as i do, could be a potential memory leak.

        return Mono.just(response)
    }

The new exchangeToMono implementation basically forces you to consume the body in favour of avoiding memory leaks. If you want to work on the raw response, you will be forced to consume the body.

So lats talk about your example and your needs.

You just want to basically proxy the request from one server to another. You do actually consume the body you just don't do it in the flatMap in close proximity to the WebClient.

.exchange()
   .flatMap { response ->
      ServerResponse.
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToMono<ByteArray>()) 
         // Here you are declaring you want to consume but it isn't consumed right here, its not consumed until much later.
   }

Here in your code, you are returning a ServerResponse but you have to always think about. Nothing happens until you subscribe. You are basically passing a long a ServerResponse but you haven't consumed the body yet. You have only declared that when the server needs the body, it will then have to consume the body of the last response the get the new body.

Think of it this way, you are returning a ServerResponse that only contains declarations about what we want in it, not what is actually in it.

As this gets returned from the flatMap it will travel all the way out of the application until we write it as a response to our open TCP connection we have against the client.

Only there and then will the response be built and that's when your first response from the WebClient will be consumed and closed.

So your original code, does work, because you do consume the WebClient response, you are just not doing it until you write a response to the calling client.

What you are doing was not inherently wrong, it was just that having the WebClient API this way enhances the risk of ppl using it wrong, and memory leaks could happen.

I hope this at least answers some of the questions you have i was mostly writing down my interpretation of the change.

Tic answered 3/11, 2020 at 12:38 Comment(7)
I agree with this answer. As for the last part of the question (Mono<ByteArray> vs ByteArray): in each case, the full response body wil be read and buffered in memory in a single ByteArray instance, so in that sense they're equivalent. If your want to be more memory efficient, dealing with Flux<DataBuffer> is much better.Osmo
@BrianClozel coming from someone that works from Pivotal i feel honored, thank you :)Tic
So basically I will have to process (buffer) the response body anyway, be it inside .exchangeToMono() or immediately before writing to tcpconnection? So there is no principal difference here, so I shouldn't try to stick with passing a Mono, right? I thought maybe I could create some kind of a stream pipeline directly from the other server to my client, with bytes of the boty flowing freely, and with my proxy server being concerned only with the direction of the pipe and the headers and stuff...Lowman
A Mono<ByteArray> works with one full list of bytes, so if you choose to use a Mono, your application will (when needing the body) read in the entire byte array into memory, when the mono is called. If you as Brian said, you go with a Flux<DataBuffer> when we need the body, the flux will read in some bytes into the buffer, and then pass these bytes along, then the next bytes, and pass these along, then the next until we have consumed the body. Risk of going OOM is much less then.Tic
but yes, you have understood it right, my understanding is that you weren't doing anything wrong before, its just that Spring is now more strict of where we need to process the response.Tic
How do you guys were able to unit test the function written inside the exchangeToMono(function) whilst mocking the behavior of exchangeToMono(function) ?Pipsissewa
please dont ask new questions in commentsTic
G
1

As a safe workaround for backward compatibility:

exchangeToMono(rs -> Mono.just(rs.mutate().build)))

You still receive ClientResponse type downstream with all data consumed / released on original ClientResponse during mutate().

Don't forget to release mutated one.

UPDATE:

this is not a right way , since it's not efficient, but if you need ad-hoc testing in huge legacy reactive pipeline - it work.

Gorton answered 26/2, 2021 at 16:25 Comment(1)
I don't know why you got down voted but this answer help me to get rid of the depreciated .exchange() on a context where I needed to return a Mono<ClientResponse> to create a ClientResponse object. Since I'm not returning a dto .retrieve() did not help at all.Imperative
J
1

Found this way to be working with memory-efficient DataBuffers. onStatus or onRawStatus is crucial for statuses that are not success, as it replaces default error handlers.

   .retrieve()
   // ignore all statuses so no error is thrown when status is > 400 in toEntityFlux()
   .onRawStatus(status -> true, response -> Mono.empty())
   .toEntityFlux(DataBuffer.class)
   .flatMap(entity -> ServerResponse
       .status(entity.getStatusCode())
       .body(entity.getBody(), DataBuffer.class))
Job answered 3/6, 2022 at 12:14 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.