WebClient - how to get request body?
Asked Answered
K

7

20

I've started using WebClient and I'm adding logging of request/response using filter method:

WebClient.builder()
    .baseUrl(properties.getEndpoint())
    .filter((request, next) -> {
        // logging
        request.body()
    })
    .build();

I'm able to access url, http method, headers but I've a problem with getting raw request body as body() method of request returns BodyInserter (BodyInserter<?, ? super ClientHttpRequest> body().

How to convert BodyInserter to String representation of request body? Alternatively, how to properly log whole request/response while also being able to hash potential credentials in it?

Knitting answered 19/7, 2019 at 8:37 Comment(8)
This to my knowledge not possible because webflux is a wrapper round a netty client and a body inserter is just a passed lambda function that is to be executed when the actual request is made.Solfeggio
Any sensible alternative then? There has to be a way of relatively easy request/response logging with possibility of customisationKnitting
Why does there have to be? I have never felt the nead to log bodies in requests. That is an opinion based opinion from you that there ”has to be something that logs the body easy”. Why do you have to log the body. State your case.Solfeggio
And then open an issue at spring stating why you need it and they might implement it.Solfeggio
At some point you pass the request body to the web client. Can you do the logging at that point?Recliner
I would have to add logging per method or implement wrapper around WebClient which does not sound like a good solution to me. I want a centralised place for logging http request/response which I can share between multiple applications inside the networkKnitting
@ThomasAndolf That is an opinion based opinion from you that nobody should want to log the body in an easy way, because you personally have never felt the need to log bodies in requests. :) IMHO it actually does make sense a lot, if you want to audit/debug, what was actually sent over the wire and standard logging doesn't cut it, because you have to mask credentials (because of PCI compliance for instance)Orozco
@StefanHaberl that was exactly what i wrote, it is MY opinion, and i wanted to hear HIS case. I, and only ME no one else, just me, considers logging for debugging is a code smell and debuggers should be used. But that is MY personal opinion, and NO ONE elses. And logging requests just for logging a request, you should log what is needed by authorities for auditing, usually this does NOT include logging full requests, logging full requests is a security issue in accordance to GDPR that non compliant values are accidentally logged.Solfeggio
K
4

Coming back to the topic with an answer I am so far satisfied.

In following example I created HttpClient with LoggingHandler that does the logging magic, injected it into ReactorClientHttpConnector which was default HttpConnector (see DefaultWebClientBuilder#initConnector) and then to WebClient.

val baseHttpClient = HttpClient.create()
    .doOnRequest(
        (request, conn) -> conn.addHandlerFirst(new LoggingHandler(LogLevel.INFO)
    ));

val httpClient = WebClient.builder()
    .baseUrl("https://google.pl")
    .clientConnector(new ReactorClientHttpConnector(baseHttpClient))
    .build();

val response = httpClient.post()
    .body(Mono.just("Your request body"), String.class)
    .exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class))
    .block();

I'm still planning on creating custom LoggingHandler which will sanitise and simplify logs.

Knitting answered 12/6, 2022 at 21:6 Comment(2)
Thanks for this variant, updated my answer with the custom LoggingHandler. This is much more clear and straightforward so I will use itDisqualification
Accepted my own answer as I'm satisfied with the implementation, it's working nicely in production environment.Knitting
D
6

Tried all answers, but some of them don't fit my needs or just do not work. Wrote my own solution based on this answers to intercept request/response bodies and log them.

@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {

    @Override public void customize(WebClient.Builder webClientBuilder) {
        webClientBuilder.filter((request, next) -> {
            logRequest(request);
            return next
                .exchange(interceptBody(request))
                .doOnNext(this::logResponse)
                .map(this::interceptBody);
        });
    }

    private ClientRequest interceptBody(ClientRequest request) {
        return ClientRequest.from(request)
            .body((outputMessage, context) -> request.body().insert(new ClientHttpRequestDecorator(outputMessage) {
                @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    return super.writeWith(Mono.from(body)
                        .doOnNext(dataBuffer -> logRequestBody(dataBuffer)));
                }
            }, context))
            .build();
    }

    private ClientResponse interceptBody(ClientResponse response) {
        return response.mutate()
            .body(data -> data.doOnNext(this::logResponseBody))
            .build();
    }

    private void logRequest(ClientRequest request) {
        log.debug("DOWNSTREAM REQUEST: METHOD {}, URI: {}, HEADERS: {}", request.method(), request.url(), request.headers());
    }

    private void logRequestBody(DataBuffer dataBuffer) {
        log.debug("DOWNSTREAM REQUEST: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
    }

    private void logResponse(ClientResponse response) {
        log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}", response.rawStatusCode(), response.headers().asHttpHeaders());
    }

    private void logResponseBody(DataBuffer dataBuffer) {
        log.debug("DOWNSTREAM RESPONSE: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
    }

}

Update: added snippet to log using reactor.netty.http.client.HttpClient (preferrable solution)

@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {

    @Override public void customize(WebClient.Builder webClientBuilder) {
        HttpClient httpClient = HttpClient.create()
            .doOnRequest((httpClientRequest, connection) -> connection.addHandlerFirst(new LoggingHandler()));
        webClientBuilder.clientConnector(new ReactorClientHttpConnector(httpClient));
    }

    private static class LoggingHandler extends ChannelDuplexHandler {

        @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (msg instanceof FullHttpRequest request) {
                log.debug("DOWNSTREAM  REQUEST: METHOD: {}, URI: {}, BODY: {}, HEADERS: {}",
                    request.method(), request.uri(), request.content().toString(defaultCharset()), request.headers());
            } else if (msg instanceof HttpRequest request) {
                log.debug("DOWNSTREAM  REQUEST: METHOD: {}, URI: {}, HEADERS: {}",
                    request.method(), request.uri(), request.headers());
            } else if (msg instanceof FullHttpMessage message) {
                log.debug("DOWNSTREAM  REQUEST: BODY: {}",
                    message.content().toString(defaultCharset()));
            }
            super.write(ctx, msg, promise);
        }

        @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpResponse response) {
                log.debug("DOWNSTREAM RESPONSE: STATUS: {}, BODY: {}, HEADERS: {}",
                    response.status().code(), response.content().toString(defaultCharset()), response.headers());
            } else if (msg instanceof HttpResponse response) {
                log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}",
                    response.status().code(), response.headers());
            } else if (!(msg instanceof LastHttpContent) && msg instanceof HttpContent httpContent) {
                log.debug("DOWNSTREAM RESPONSE: BODY: {}",
                    httpContent.content().toString(defaultCharset()));
            }
            super.channelRead(ctx, msg);
        }
    }

}
Disqualification answered 10/6, 2022 at 15:9 Comment(2)
Nice that you used WebClientCustomizer, making it a cleaner solution. On the downside if I understand this correctly, I can't have customizer sanitization's per WebClient case, I would have to have a global oneKnitting
Unfortunately in my case the msg was an instance of DefaultHttpRequest with no access to the message body. Is there a solution for this case?Fahey
K
4

Coming back to the topic with an answer I am so far satisfied.

In following example I created HttpClient with LoggingHandler that does the logging magic, injected it into ReactorClientHttpConnector which was default HttpConnector (see DefaultWebClientBuilder#initConnector) and then to WebClient.

val baseHttpClient = HttpClient.create()
    .doOnRequest(
        (request, conn) -> conn.addHandlerFirst(new LoggingHandler(LogLevel.INFO)
    ));

val httpClient = WebClient.builder()
    .baseUrl("https://google.pl")
    .clientConnector(new ReactorClientHttpConnector(baseHttpClient))
    .build();

val response = httpClient.post()
    .body(Mono.just("Your request body"), String.class)
    .exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class))
    .block();

I'm still planning on creating custom LoggingHandler which will sanitise and simplify logs.

Knitting answered 12/6, 2022 at 21:6 Comment(2)
Thanks for this variant, updated my answer with the custom LoggingHandler. This is much more clear and straightforward so I will use itDisqualification
Accepted my own answer as I'm satisfied with the implementation, it's working nicely in production environment.Knitting
S
2

You can create your own wrapper/proxy class around the JSON encoder and intercept the serialized body before it is sent into the intertubes.

This blog post shows how to log the JSON payloads of WebClient requests and responses

Specifically, you would extend the encodeValue method (or encodeValues in case of streaming data) of Jackson2JsonEncoder. Then you can do with that data what you wish, such as logging etc. And you could even do this conditionally based on environment/profile

This custom logging-encoder can be specified when creating the WebClient, by codecs:

 CustomBodyLoggingEncoder bodyLoggingEncoder = new CustomBodyLoggingEncoder();
 WebClient.builder()
          .codecs(clientDefaultCodecsConfigurer -> {
             clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonEncoder(bodyLoggingEncoder);
             clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(new ObjectMapper(), MediaType.APPLICATION_JSON));
          })
          ...

Update 2020/7/3:

Here is a rushed example applying the same principle but for a decoder:

public class LoggingJsonDecoder extends Jackson2JsonDecoder {
    private final Consumer<byte[]> payloadConsumer;

    public LoggingJsonEncoder(final Consumer<byte[]> payloadConsumer) {
        this.payloadConsumer = payloadConsumer;
    }

    @Override
    public Mono<Object> decodeToMono(final Publisher<DataBuffer> input, final ResolvableType elementType, final MimeType mimeType, final Map<String, Object> hints) {
        // Buffer for bytes from each published DataBuffer
        final ByteArrayOutputStream payload = new ByteArrayOutputStream();

        // Augment the Flux, and intercept each group of bytes buffered
        final Flux<DataBuffer> interceptor = Flux.from(input)
                                                 .doOnNext(buffer -> bufferBytes(payload, buffer))
                                                 .doOnComplete(() -> payloadConsumer.accept(payload.toByteArray()));

        // Return the original method, giving our augmented Publisher
        return super.decodeToMono(interceptor, elementType, mimeType, hints);
    }

    private void bufferBytes(final ByteArrayOutputStream bao, final DataBuffer buffer) {
        try {
            bao.write(ByteUtils.extractBytesAndReset(buffer));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

You would configure that along with the encoder using the codecs builder method on WebClient. Of course this above only works assuming your data is being deserialized to a Mono. But override other methods if you need it. Also I'm just stdout'ing the resulting JSON there, but you could pass in a Consumer<String> or something for the decoder to send the string to for example, or just log from there ; up to you.

A word of warning that in it's current form this is going to be doubling your memory usage, as it's buffering the entire response. If you can send that byte data off to another process/thread to write to log file or some output stream (or Flux even) immediately, you could avoid buffering the whole payload in memory.

Scarificator answered 18/6, 2020 at 14:39 Comment(3)
This works great for capturing the serialized request body, but I'm a little bit lost when it comes to capturing the response body before it is deserialized. Any advice?Orozco
@StefanHaberl Added more about response loggingScarificator
I would Use DataBufferUtils per my answer here - stackoverflow.com/a/72871244Bermuda
E
1

Request body could be accessed when BodyInserter writes to ReactiveHttpOutputMessage. So just create a FilterFunction and create new request from existing but for body set new BodyInserser () overriding method insert see example below. Response and Request payloads could be read multiple times since those are buffered in DataBuffers

public class TracingExchangeFilterFunction implements ExchangeFilterFunction {
 
 
        return next.exchange(buildTraceableRequest(request))
                .flatMap(response ->
                        response.body(BodyExtractors.toDataBuffers())
                                .next()
                                .doOnNext(dataBuffer -> traceResponse(response, dataBuffer))
                                .thenReturn(response)) ;
    }

    private ClientRequest buildTraceableRequest( 
            final ClientRequest clientRequest) {
        return ClientRequest.from(clientRequest).body(
                new BodyInserter<>() {
                    @Override
                    public Mono<Void> insert(
                            final ClientHttpRequest outputMessage,
                            final Context context) {
                        return clientRequest.body().insert(
                                new ClientHttpRequestDecorator(outputMessage) {
                                    @Override
                                    public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
                                        return super.writeWith(
                                                from(body).doOnNext(buffer ->
                                                        traceRequest(clientRequest, buffer)));
                                    }
                                }, context);
                    }
                }).build();
    }

    private void traceRequest(ClientRequest clientRequest, DataBuffer buffer) {
        final ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(buffer);
        final byte[] bytes = ByteBufUtil.getBytes(byteBuf);
        // do some tracing e.g. new String(bytes)
    }


    private void traceResponse(ClientResponse response, DataBuffer dataBuffer) {
        final byte[] bytes = new byte[dataBuffer.readableByteCount()];
        dataBuffer.read(bytes);
        // do some tracing e.g. new String(bytes)
    }
}
Ethereal answered 3/2, 2022 at 22:35 Comment(1)
Your class does not compile. There are many issues with your code. For starters the class must override the filter function of the interface :-(Fahey
B
1

Consider using an exchange filter function that wraps the request object and intercepts the body being written -- buffering it, and working with it in a callback after the body is assembled. Apparently this is not something the spring team want to support in the library itself as it as async is not the paradigm to be buffering things, but here is an example of such an exchange function that would accomplish this:

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestDecorator;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import reactor.core.publisher.Mono;

import java.util.function.BiConsumer;


class BodyBufferingExchangeFilterFunction implements ExchangeFilterFunction {

    private BiConsumer<ClientHttpRequest, DataBuffer> callback;

    {
        // e.g.
        callback = (ClientHttpRequest req, DataBuffer body) -> {
            // set the content length
            req.getHeaders().setContentLength(body.readableByteCount());
        };
    }

    public BodyBufferingExchangeFilterFunction(BiConsumer<ClientHttpRequest, DataBuffer> callback) {
        this.callback = callback;
    }

    @Override
    @NonNull
    public Mono<ClientResponse> filter(@NonNull ClientRequest request,
                                       @NonNull ExchangeFunction next) {
        if (request.method() == HttpMethod.PUT ||
                request.method() == HttpMethod.POST) {
            ClientRequest buffered = ClientRequest.from(request)
                    .body((ClientHttpRequest outputMessage, BodyInserter.Context context) ->
                            request.body().insert(new BufferingDecorator(outputMessage, callback), context))
                    .build();
            return next.exchange(buffered);
        } else {
            return next.exchange(request);
        }
    }

    private static final class BufferingDecorator extends ClientHttpRequestDecorator {
        private final BiConsumer<ClientHttpRequest, DataBuffer> callback;

        public BufferingDecorator(ClientHttpRequest outputMessage,
                                  BiConsumer<ClientHttpRequest, DataBuffer> callback) {
            super(outputMessage);
            this.callback = callback;
        }

        @Override
        @NonNull
        public Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
            return DataBufferUtils.join(body).flatMap(buffer -> {
                if (callback != null) callback.accept(this, buffer);
                return super.writeWith(Mono.just(buffer));
            });
        }

    }

}

Here's how you'd use it with a webclient too:

// new webclient
WebClient webClient = WebClient.builder()
        .filter(new BodyBufferingExchangeFilterFunction())
        .build();

// existing webclient
WebClient webClient = getWebClientSomeHow();
webClient = webClient.mutate().filter(new BodyBufferingExchangeFilterFunction()).build();

ResponseEntity<String> somePost = webClient.post()
        .body(Mono.just("someData"), String.class)
        .retrieve().toEntity(String.class)
        .block();
Bermuda answered 5/7, 2022 at 14:37 Comment(2)
You set callback and then in the constructor you override the callback value. The constructor requires one parameter but when you instantiate the class there is no parameter passed in. Was your intention to use DataBuffer in the call back to access the body?Fahey
it was just to demonstrate the example, yes, use the callback to access the bodyBermuda
G
0

Try setting the following properties:

logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=TRACE
logging.level.reactor.netty.http.client.HttpClient: DEBUG
spring.http.log-request-details=true
Glucosuria answered 19/7, 2019 at 9:5 Comment(1)
I won't have control over what/how it is being loggedKnitting
T
0

In just a few lines this will log everything:

 import reactor.netty.tcp.TcpClient;
 import reactor.netty.resources.ConnectionProvider;

 TcpClient tcpClient = TcpClient.create(ConnectionProvider.newConnection());
 WebClient webClient = WebClient.builder().clientConnector(new 
 ReactorClientHttpConnector(HttpClient.from(tcpClient).wiretap(true)))
            .filter(...
Tessera answered 26/9, 2022 at 21:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.