How to log request and response bodies in Spring WebFlux
Asked Answered
T

13

63

I want to have centralised logging for requests and responses in my REST API on Spring WebFlux with Kotlin. So far I've tried this approaches

@Bean
fun apiRouter() = router {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }
}.filter { request, next ->
    logger.info { "Processing request $request with body ${request.bodyToMono<String>()}" }
    next.handle(request).doOnSuccess { logger.info { "Handling with response $it" } }
}

Here request method and path log successfully but the body is Mono, so how should I log it? Should it be the other way around and I have to subscribe on request body Mono and log it in the callback? Another problem is that ServerResponse interface here doesn't have access to the response body. How can I get it here?


Another approach I've tried is using WebFilter

@Bean
fun loggingFilter(): WebFilter =
        WebFilter { exchange, chain ->
            val request = exchange.request
            logger.info { "Processing request method=${request.method} path=${request.path.pathWithinApplication()} params=[${request.queryParams}] body=[${request.body}]"  }

            val result = chain.filter(exchange)

            logger.info { "Handling with response ${exchange.response}" }

            return@WebFilter result
        }

Same problem here: request body is Flux and no response body.

Is there a way to access full request and response for logging from some filters? What don't I understand?

Talie answered 21/7, 2017 at 14:17 Comment(1)
This post has the code (Java) to log request body - #61707448Anaheim
V
27

This is more or less similar to the situation in Spring MVC.

In Spring MVC, you can use a AbstractRequestLoggingFilter filter and ContentCachingRequestWrapper and/or ContentCachingResponseWrapper. Many tradeoffs here:

  • if you'd like to access servlet request attributes, you need to actually read and parse the request body
  • logging the request body means buffering the request body, which can use a significant amount of memory
  • if you'd like to access the response body, you need to wrap the response and buffer the response body as it's being written, for later retrieval

ContentCaching*Wrapper classes don't exist in WebFlux but you could create similar ones. But keep in mind other points here:

  • buffering data in memory somehow goes against the reactive stack, since we're trying there to be very efficient with the available resources
  • you should not tamper with the actual flow of data and flush more/less often than expected, otherwise you'd risk breaking streaming uses cases
  • at that level, you only have access to DataBuffer instances, which are (roughly) memory-efficient byte arrays. Those belong to buffer pools and are recycled for other exchanges. If those aren't properly retained/released, memory leaks are created (and buffering data for later consumption certainly fits that scenario)
  • again at that level, it's only bytes and you don't have access to any codec to parse the HTTP body. I'd forget about buffering the content if it's not human-readable in the first place

Other answers to your question:

  • yes, the WebFilter is probably the best approach
  • no, you shouldn't subscribe to the request body otherwise you'd consume data that the handler won't be able to read; you can flatMap on the request and buffer data in doOn operators
  • wrapping the response should give you access to the response body as it's being written; don't forget about memory leaks, though
Vittle answered 24/7, 2017 at 12:33 Comment(4)
Thank you for a detailed answer. Looks like such high-level filtering (and logging) goes against core reactive ideology and I should consider moving logging to the business level (at least for responses)Talie
@brian-clozel, what do you mean "flatMap on the request"? Can you elaborate?Fortalice
Can you elaborate on the retain/release model? I see it used in StringDecoder, but don't quite understand. The PooledDataBuffer docs are useless in that regard.Elastin
concerning "why would you want to do that": I have a use-case where we have to persist every "message" (requests/responses both downstream and upstream) for visibility purposes. I could accept Strings in my controller methods, and do the parsing myself, but that's .. mean? I could also serialize POJOs before persisting but that just wastes resources.. so I thought, may be there's a way to "sneak in" WebFlux/Netty pipeline so I could keep representation of a request body in memory while it's being processed, for persistence purposes. not sure why that would be worse than manual serialization.Baughman
L
18

This is what I came up with for java.

public class RequestResponseLoggingFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest httpRequest = exchange.getRequest();
        final String httpUrl = httpRequest.getURI().toString();

        ServerHttpRequestDecorator loggingServerHttpRequestDecorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
            String requestBody = "";

            @Override
            public Flux<DataBuffer> getBody() {
                return super.getBody().doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                        Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        requestBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                        commonLogger.info(LogMessage.builder()
                                .step(httpUrl)
                                .message("log incoming http request")
                                .stringPayload(requestBody)
                                .build());
                    } catch (IOException e) {
                        commonLogger.error(LogMessage.builder()
                                .step("log incoming request for " + httpUrl)
                                .message("fail to log incoming http request")
                                .errorType("IO exception")
                                .stringPayload(requestBody)
                                .build(), e);
                    }
                });
            }
        };

        ServerHttpResponseDecorator loggingServerHttpResponseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
            String responseBody = "";
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                Mono<DataBuffer> buffer = Mono.from(body);
                return super.writeWith(buffer.doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                        Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        responseBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                        commonLogger.info(LogMessage.builder()
                                .step("log outgoing response for " + httpUrl)
                                .message("incoming http request")
                                .stringPayload(responseBody)
                                .build());
                    } catch (Exception e) {
                        commonLogger.error(LogMessage.builder()
                                .step("log outgoing response for " + httpUrl)
                                .message("fail to log http response")
                                .errorType("IO exception")
                                .stringPayload(responseBody)
                                .build(), e);
                    }
                }));
            }
        };
        return chain.filter(exchange.mutate().request(loggingServerHttpRequestDecorator).response(loggingServerHttpResponseDecorator).build());
    }

}
Lunge answered 9/6, 2020 at 1:5 Comment(3)
Note that the approach with overriding of the getBody() method only works for controllers which access the body (see @RequestBody).Bioastronautics
what is commonLogger here?Fred
@GabrielGarcíaGarrido custom class that I created for logging purposes - log in json format. It's a wrapper on top of slf4j logger.Lunge
F
13

I didn't find a good way to log request/response bodies, but if you are just interested in meta data then you can do it like follows.

import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Mono

@Component
class LoggingFilter(val requestLogger: RequestLogger, val requestIdFactory: RequestIdFactory) : WebFilter {
    val logger = logger()

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        logger.info(requestLogger.getRequestMessage(exchange))
        val filter = chain.filter(exchange)
        exchange.response.beforeCommit {
            logger.info(requestLogger.getResponseMessage(exchange))
            Mono.empty()
        }
        return filter
    }
}

@Component
class RequestLogger {

    fun getRequestMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val method = request.method
        val path = request.uri.path
        val acceptableMediaTypes = request.headers.accept
        val contentType = request.headers.contentType
        return ">>> $method $path ${HttpHeaders.ACCEPT}: $acceptableMediaTypes ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    fun getResponseMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val response = exchange.response
        val method = request.method
        val path = request.uri.path
        val statusCode = getStatus(response)
        val contentType = response.headers.contentType
        return "<<< $method $path HTTP${statusCode.value()} ${statusCode.reasonPhrase} ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    private fun getStatus(response: ServerHttpResponse): HttpStatus =
        try {
            response.statusCode
        } catch (ex: Exception) {
            HttpStatus.CONTINUE
        }
}
Ferric answered 25/8, 2017 at 8:54 Comment(2)
I get an http 100 status every time with this approach (because response.statusCode is null). So far I have not been able to figure out how to properly get the status code of the response in a WebFilter. Anyone know?Vtehsta
friggin "var" stuff.Concierge
C
5

You can actually enable DEBUG logging for Netty and Reactor-Netty related to see full picture of what's happening. You could play with the below and see what you want and don't. That was the best I could.

reactor.ipc.netty.channel.ChannelOperationsHandler: DEBUG
reactor.ipc.netty.http.server.HttpServer: DEBUG
reactor.ipc.netty.http.client: DEBUG
io.reactivex.netty.protocol.http.client: DEBUG
io.netty.handler: DEBUG
io.netty.handler.proxy.HttpProxyHandler: DEBUG
io.netty.handler.proxy.ProxyHandler: DEBUG
org.springframework.web.reactive.function.client: DEBUG
reactor.ipc.netty.channel: DEBUG
Ceremony answered 26/2, 2018 at 9:13 Comment(2)
This is an option for local debugging, but we can't use this in production instances since it will expose the header contents as well.Sweltering
i just added this. it did not even show a single logCercus
C
5

Since Spring Boot 2.2.x, Spring Webflux supports Kotlin coroutines. With coroutines, you can have the advantages of non-blocking calls without having to handle Mono and Flux wrapped objects. It adds extensions to ServerRequest and ServerResponse, adding methods like ServerRequest#awaitBody() and ServerResponse.BodyBuilder.bodyValueAndAwait(body: Any). So you could rewrite you code like this:

@Bean
fun apiRouter() = coRouter {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            /* the handler methods now use ServerRequest and ServerResponse directly
             you just need to add suspend before your function declaration:
             suspend fun listUsers(ServerRequest req, ServerResponse res) */ 
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }

    // this filter will be applied to all routes built by this coRouter
    filter { request, next ->
      // using non-blocking request.awayBody<T>()
      logger.info("Processing $request with body ${request.awaitBody<String>()}")
        val res = next(request)
        logger.info("Handling with Content-Type ${res.headers().contentType} and status code ${res.rawStatusCode()}")
        res 
    }
}

In order to create a WebFilter Bean with coRoutines, I think you can use this CoroutineWebFilter interface (I haven't tested it, I don't know if it works).

Canna answered 4/2, 2021 at 3:36 Comment(0)
F
4

I am pretty new to Spring WebFlux, and I don't know how to do it in Kotlin, but should be the same as in Java using WebFilter:

public class PayloadLoggingWebFilter implements WebFilter {

    public static final ByteArrayOutputStream EMPTY_BYTE_ARRAY_OUTPUT_STREAM = new ByteArrayOutputStream(0);

    private final Logger logger;
    private final boolean encodeBytes;

    public PayloadLoggingWebFilter(Logger logger) {
        this(logger, false);
    }

    public PayloadLoggingWebFilter(Logger logger, boolean encodeBytes) {
        this.logger = logger;
        this.encodeBytes = encodeBytes;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if (logger.isInfoEnabled()) {
            return chain.filter(decorate(exchange));
        } else {
            return chain.filter(exchange);
        }
    }

    private ServerWebExchange decorate(ServerWebExchange exchange) {
        final ServerHttpRequest decorated = new ServerHttpRequestDecorator(exchange.getRequest()) {

            @Override
            public Flux<DataBuffer> getBody() {

                if (logger.isDebugEnabled()) {
                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    return super.getBody().map(dataBuffer -> {
                        try {
                            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        } catch (IOException e) {
                            logger.error("Unable to log input request due to an error", e);
                        }
                        return dataBuffer;
                    }).doOnComplete(() -> flushLog(baos));

                } else {
                    return super.getBody().doOnComplete(() -> flushLog(EMPTY_BYTE_ARRAY_OUTPUT_STREAM));
                }
            }

        };

        return new ServerWebExchangeDecorator(exchange) {

            @Override
            public ServerHttpRequest getRequest() {
                return decorated;
            }

            private void flushLog(ByteArrayOutputStream baos) {
                ServerHttpRequest request = super.getRequest();
                if (logger.isInfoEnabled()) {
                    StringBuffer data = new StringBuffer();
                    data.append('[').append(request.getMethodValue())
                        .append("] '").append(String.valueOf(request.getURI()))
                        .append("' from ")
                            .append(
                                Optional.ofNullable(request.getRemoteAddress())
                                            .map(addr -> addr.getHostString())
                                        .orElse("null")
                            );
                    if (logger.isDebugEnabled()) {
                        data.append(" with payload [\n");
                        if (encodeBytes) {
                            data.append(new HexBinaryAdapter().marshal(baos.toByteArray()));
                        } else {
                            data.append(baos.toString());
                        }
                        data.append("\n]");
                        logger.debug(data.toString());
                    } else {
                        logger.info(data.toString());
                    }

                }
            }
        };
    }

}

Here some tests on this: github

I think this is what Brian Clozel (@brian-clozel) meant.

Frolic answered 21/12, 2017 at 19:24 Comment(5)
This is flies in the face of reactive programming, where you're buffering the whole content. Certainly not what Brian said.Elastin
I found this example helpful. I could save the request JSON in a database using this mechanism to maintain an audit trail.Apiary
@Frolic This worked only for POST requests where getBody() is being called. If I have to invoke the flushLog for GET requests what should be done? getBody() is not being called in that case.Smetana
I did a hack, I override the getMethodValue() and invoked flushLog if the request type is GET.Smetana
Note that the approach with overriding of the getBody() method only works for controllers which access the body (see @RequestBody).Bioastronautics
E
2

What Brian said. In addition, logging request/response bodies don't make sense for reactive streaming. If you imagine the data flowing through a pipe as a stream, you don't have the full content at any time unless you buffer it, which defeats the whole point. For small request/response, you can get away with buffering, but then why use the reactive model (other than to impress your coworkers :-) )?

The only reason for logging request/response that I could conjure up is debugging, but with the reactive programming model, debugging method has to be modified too. Project Reactor doc has an excellent section on debugging that you can refer to: http://projectreactor.io/docs/core/snapshot/reference/#debugging

Elastin answered 1/1, 2018 at 21:50 Comment(5)
Its for debugging during development. No one enables debugging in production. I have explained in details in my other post on why debugging is required #47597071Bonspiel
Here an scenario I fill has nothing to do with debugging, suppose you have a retry policy configured to act when, lets say, HttpStatus 503/504 is returned, It seems to me that to be able to log this response is much more than simply debugging thing, it might give us useful information about the reason why this is happening in a given moment if API does so, so I hope I found a way to this in a reasonable not 500 code lines as I've seen around after of a couple of days of looking for it, really frustated with this basic thing.Typhoid
If you have an objective of analyzing what data your clients receive and you need to store every response for that, how would you do it then? I was thinking about having parallel coroutines to perform this, while the main thread would operate as normalReproduction
@LeonidBor I'd start by posting a new question without hijacking a thread from 5 years ago.Elastin
@AbhijitSarkar I'm afraid I cannot create a new question on the same topic per StackOverflow community guidelines. I'm still looking for a good way to achieve the goal, so far the best direction seems to be manually call DataBufferUtils.retain(buffer) in the event loop thread and then DataBufferUtils.release(buffer) in logging thread once logging operation is completeReproduction
U
2

Here is the GitHub Repo with complete implementation to log both request and response body along with http headers for webflux/java based application...

Untouchability answered 12/6, 2020 at 5:56 Comment(0)
B
1

Assuming we are dealing with a simple JSON or XML response, if debug level for corresponding loggers is not sufficient for some reason, one can use string representation before transforming it to object:

Mono<Response> mono = WebClient.create()
                               .post()
                               .body(Mono.just(request), Request.class)
                               .retrieve()
                               .bodyToMono(String.class)
                               .doOnNext(this::sideEffectWithResponseAsString)
                               .map(this::transformToResponse);

the following are the side-effect and transformation methods:

private void sideEffectWithResponseAsString(String response) { ... }
private Response transformToResponse(String response) { /*use Jackson or JAXB*/ }    
Beastly answered 27/12, 2018 at 14:41 Comment(0)
K
1

If your using controller instead of handler best way is aop with annotating you controller class with @Log annotation.And FYI this takes plain json object as request not mono.

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class Log

@Aspect
@Component
class LogAspect {
    companion object {
        val log = KLogging().logger
    }

    @Around("@annotation(Log)")
    @Throws(Throwable::class)
    fun logAround(joinPoint: ProceedingJoinPoint): Any? {
        val start = System.currentTimeMillis()
        val result = joinPoint.proceed()
        return if (result is Mono<*>) result.doOnSuccess(getConsumer(joinPoint, start)) else result
    }

    fun getConsumer(joinPoint: ProceedingJoinPoint, start: Long): Consumer<Any>? {
        return Consumer {
            var response = ""
            if (Objects.nonNull(it)) response = it.toString()
            log.info(
                "Enter: {}.{}() with argument[s] = {}",
                joinPoint.signature.declaringTypeName, joinPoint.signature.name,
                joinPoint.args
            )
            log.info(
                "Exit: {}.{}() had arguments = {}, with result = {}, Execution time = {} ms",
                joinPoint.signature.declaringTypeName, joinPoint.signature.name,
                joinPoint.args[0],
                response, System.currentTimeMillis() - start
            )
        }
    }
}
Karlise answered 6/10, 2020 at 1:3 Comment(0)
J
1

Ivan Lymar's answer but in Kotlin:

import org.apache.commons.io.IOUtils
import org.reactivestreams.Publisher
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.http.server.reactive.ServerHttpRequestDecorator
import org.springframework.http.server.reactive.ServerHttpResponseDecorator
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.nio.channels.Channels

@Component
class LoggingWebFilter : WebFilter {

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        val httpRequest = exchange.request
        val httpUrl = httpRequest.uri.toString()
        val loggingServerHttpRequestDecorator: ServerHttpRequestDecorator =
            object : ServerHttpRequestDecorator(exchange.request) {
                var requestBody = ""
                override fun getBody(): Flux<DataBuffer> {
                    return super.getBody().doOnNext { dataBuffer: DataBuffer ->
                        try {
                            ByteArrayOutputStream().use { byteArrayOutputStream ->
                                Channels.newChannel(byteArrayOutputStream)
                                    .write(dataBuffer.asByteBuffer().asReadOnlyBuffer())
                                requestBody =
                                    IOUtils.toString(
                                        byteArrayOutputStream.toByteArray(),
                                        "UTF-8"
                                    )
                                log.info(
                                    "Logging Request Filter: {} {}",
                                    httpUrl,
                                    requestBody
                                )
                            }
                        } catch (e: IOException) {
                            log.error(
                                "Logging Request Filter Error: {} {}",
                                httpUrl,
                                requestBody,
                                e
                            )
                        }
                    }
                }
            }

        val loggingServerHttpResponseDecorator: ServerHttpResponseDecorator =
            object : ServerHttpResponseDecorator(exchange.response) {
                var responseBody = ""
                override fun writeWith(body: Publisher<out DataBuffer>): Mono<Void> {
                    val buffer: Mono<DataBuffer> = Mono.from(body)
                    return super.writeWith(
                        buffer.doOnNext { dataBuffer: DataBuffer ->
                            try {
                                ByteArrayOutputStream().use { byteArrayOutputStream ->
                                    Channels.newChannel(byteArrayOutputStream)
                                        .write(
                                            dataBuffer
                                                .asByteBuffer()
                                                .asReadOnlyBuffer()
                                        )
                                    responseBody = IOUtils.toString(
                                        byteArrayOutputStream.toByteArray(),
                                        "UTF-8"
                                    )
                                    log.info(
                                        "Logging Response Filter: {} {}",
                                        httpUrl,
                                        responseBody
                                    )
                                }
                            } catch (e: Exception) {
                                log.error(
                                    "Logging Response Filter Error: {} {}",
                                    httpUrl,
                                    responseBody,
                                    e
                                )
                            }
                        }
                    )
                }
            }
        return chain.filter(
            exchange.mutate().request(loggingServerHttpRequestDecorator)
                .response(loggingServerHttpResponseDecorator)
                .build()
        )
    }
}
Jaipur answered 12/6, 2022 at 20:30 Comment(1)
Note that the approach with overriding of the getBody() method only works for controllers which access the body (see @RequestBody).Bioastronautics
B
0

I think the appropriate thing to do here is to write the contents of each request to a file in an asynchronous manner (java.nio) and set up an interval that reads those request body files asynchrolusly and writes them to the log in a memory usage aware manner (atleast one file at a time but up too 100 mb at a time) and after logging them removes the files from disk.

Biotite answered 5/2, 2022 at 8:10 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Sky
S
0

For Spring Boot 3 and WebFlux you can just add this line to your application.properties:

logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=DEBUG

The output will look something like this (per call):

20:32:21.416  DEBUG [reactor-http-nio-3] ExchangeFunctions : [211ef3af] HTTP GET https://api.example.com/someresource
20:32:21.689  DEBUG [reactor-http-nio-3] ExchangeFunctions : [211ef3af [db45bfc1-1] Response 200 OK
Solute answered 7/7 at 18:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.