How to log raw HTTP request/response in Python FastAPI?
Asked Answered
P

3

26

We are writing a web service using Python FastAPI that is going to be hosted in Kubernetes. For auditing purposes, we need to save the raw JSON body of the request/response for specific routes. The body size of both request and response JSON is about 1MB, and preferably, this should not impact the response time. How can we do that?

Pneumodynamics answered 22/10, 2021 at 0:22 Comment(1)
Related topic You could use middleware or apirouter and loguru logger with option enqueue=True. This will allow you not to affect response time.Stagehand
R
3

You may try to customize APIRouter like in FastAPI official documentation:

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute


class TimedRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            response: Response = await original_route_handler(request)
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            print(f"route duration: {duration}")
            print(f"route response: {response}")
            print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=TimedRoute)


@app.get("/")
async def not_timed():
    return {"message": "Not timed"}


@router.get("/timed")
async def timed():
    return {"message": "It's the time of my life"}


app.include_router(router)
Rms answered 15/1, 2022 at 11:15 Comment(0)
A
41

Option 1 - Using Middleware

You could use a Middleware. A middleware takes each request that comes to your application, and hence, allows you to handle the request before it is processed by any specific endpoint, as well as the response, before it is returned to the client. To create a middleware, you use the decorator @app.middleware("http") on top of a function, as shown below.

As you need to consume the request body from the stream inside the middleware—using either request.body() or request.stream(), as shown in this answer (behind the scenes, the former method actually calls the latter, see here)—then it won't be available when you later pass the request to the corresponding endpoint. Thus, you can follow the approach described in this post to make the request body available down the line (i.e., using the set_body() function below). UPDATE: This issue has now been fixed, and hence, there is no need to use that workaround, if you are using FastAPI 0.108.0 or later versions.

As for the response body, you can use the same approach as described in this answer to consume the body and then return the response to the client. Either option described in the aforementioned linked answer would work; the below, however, uses Option 2, which stores the body in a bytes object and returns a custom Response directly (along with the status_code, headers and media_type of the original response).

To log the data, you could use a BackgroundTask, as described in this answer, as well as this answer and this answer. A BackgroundTask will run only once the response has been sent (see Starlette documentation as well); thus, the client won't have to be waiting for the logging to complete before receiving the response (and hence, the response time won't be noticeably impacted).

Note

If you had a streaming request or response with a body that wouldn't fit into your server's RAM (for example, imagine a body of 100GB on a machine running 8GB RAM), it would become problematic, as you are storing the data to RAM, which wouldn't have enough space available to accommodate the accumulated data. Also, in case of a large response (e.g., a large FileResponse or StreamingResponse), you may be faced with Timeout errors on client side (or on reverse proxy side, if you are using one), as you would not be able to respond back to the client, until you have read the entire response body (as you are looping over response.body_iterator). You mentioned that "the body size of both request and response JSON is about 1MB"; hence, that should normally be fine (however, it is always a good practice to consider beforehand matters, such as how many requests your API is expected to be serving concurrently, what other applications might be using the RAM, etc., in order to rule whether this is an issue or not). If you needed to, you could limit the number of requests to your API endpoints using, for example, SlowAPI (as shown in this answer).

Limiting the usage of the middleware to specific routes only

You could limit the usage of the middleware to specific endpoints by:

  • checking the request.url.path inside the middleware against a pre-defined list of routes for which you would like to log the request and response, as described in this answer (see "Update" section),
  • or using a sub application, as demonstrated in this answer
  • or using a custom APIRoute class, as demonstrated in Option 2 below.

Working Example

from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging


app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)


def log_info(req_body, res_body):
    logging.info(req_body)
    logging.info(res_body)


# not needed when using FastAPI>=0.108.0.
async def set_body(request: Request, body: bytes):
    async def receive() -> Message:
        return {'type': 'http.request', 'body': body}
    request._receive = receive


@app.middleware('http')
async def some_middleware(request: Request, call_next):
    req_body = await request.body()
    await set_body(request, req_body)  # not needed when using FastAPI>=0.108.0.
    response = await call_next(request)
    
    res_body = b''
    async for chunk in response.body_iterator:
        res_body += chunk
    
    task = BackgroundTask(log_info, req_body, res_body)
    return Response(content=res_body, status_code=response.status_code, 
        headers=dict(response.headers), media_type=response.media_type, background=task)


@app.post('/')
def main(payload: Dict[Any, Any]):
    return payload

In case you would like to perform some validation on the request body—for example, ensruing that the request body size is not exceeding a certain value—instead of using request.body(), you can process the body one chunk at a time using the .stream() method, as shown below (similar to this answer).

@app.middleware('http')
async def some_middleware(request: Request, call_next):    
    req_body = b''
    async for chunk in request.stream():
        req_body += chunk
    ...

Option 2 - Using custom APIRoute class

You can alternatively use a custom APIRoute class—similar to here and here—which, among other things, would allow you to manipulate the request body before it is processed by your application, as well as the response body before it is returned to the client. This option also allows you to limit the usage of this class to the routes you wish, as only the endpoints under the APIRouter (i.e., router in the example below) will use the custom APIRoute class .

It should be noted that the same comments mentioned in Option 1 above, under the "Note" section, apply to this option as well. For example, if your API returns a StreamingResponse—such as in /video route of the example below, which is streaming a video file from an online source (public videos to test this can be found here, and you can even use a longer video than the one used below to see the effect more clearly)—you may come across issues on server side, if your server's RAM can't handle it, as well as delays on client side (and reverse proxy server, if using one) due to the whole (streaming) response being read and stored in RAM, before it is returned to the client (as explained earlier). In such cases, you could exclude such endpoints that return a StreamingResponse from the custom APIRoute class and limit its usage only to the desired routes—especially, if it is a large video file, or even live video that wouldn't likely make much sense to have it stored in the logs—simply by not using the @<name_of_router> decorator (i.e., @router in the example below) for such endpoints, but rather using the @<name_of_app> decorator (i.e., @app in the example below), or some other APIRouter or sub application.

Working Example

from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx


def log_info(req_body, res_body):
    logging.info(req_body)
    logging.info(res_body)

       
class LoggingRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            req_body = await request.body()
            response = await original_route_handler(request)
            tasks = response.background
            
            if isinstance(response, StreamingResponse):
                res_body = b''
                async for item in response.body_iterator:
                    res_body += item
                  
                task = BackgroundTask(log_info, req_body, res_body)
                response = Response(content=res_body, status_code=response.status_code, 
                        headers=dict(response.headers), media_type=response.media_type)
            else:
                task = BackgroundTask(log_info, req_body, response.body)
            
            # check if the original response had background tasks already attached to it
            if tasks:
                tasks.add_task(task)  # add the new task to the tasks list
                response.background = tasks
            else:
                response.background = task
                
            return response
            
        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)


@router.post('/')
def main(payload: Dict[Any, Any]):
    return payload


@router.get('/video')
def get_video():
    url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'
    
    def gen():
        with httpx.stream('GET', url) as r:
            for chunk in r.iter_raw():
                yield chunk

    return StreamingResponse(gen(), media_type='video/mp4')


app.include_router(router)
Atcliffe answered 23/8, 2022 at 19:12 Comment(9)
Option 1 works fine when a FastAPI request works OK. But the code is not executed when something goes wrong and a HTTP Exception like 'HTTPException(400, str(e))` is raised. How can I log the incoming request in these cases, too?Lotetgaronne
@Lotetgaronne I had a quick look and the code works as expected, even when HTTPException is raised.Atcliffe
yes there was a problem with my code. Thanks for taking a look. BTW, I have not found a way to quickly see which of the router's endpoint was triggered plus have a way to bubble up information from the endpoint. Currently I misuse the response headers to pass values upwards to the logger, but that seems like a dirty hack to me.Lotetgaronne
@Lotetgaronne You could use request.url.path, as shown here, to identify which endpoint was triggered. See this answer and this answer as well.Atcliffe
Thanks, I've tried that and it works. (but it seems not suitable for larger projects) In my real world code I have the routes in seperate files with are imported and attached like this app.include_router(router=routes_status.router, prefix="/api", tags=["My API"]). These routers has a few endpoints each. For me the obvious place to store the info if something should trigger the middleware is in that file close to the endpoint, so the should-I-log info should "bubble up". That's why I am passing the info up via headers, but am looking for a better way. I should make that a new question...Lotetgaronne
@Lotetgaronne You could store additional information on the Request object using request.state, as described here, as well as here and here. You might find this answer helpful as well.Atcliffe
thanks for those suggestions - that's very helpful.Lotetgaronne
If using fastapi>0.106.0 then you no longer need the set_body() method described above in Option 1. If you try to use it, then you will get error Unexpected message received: http.request. This is because this 'quirk' of fastapi has been fixed, so you no longer need to restore the request body after reading it. See more at github.com/tiangolo/fastapi/discussions/…Sabadell
@ZoltanFedor The answer has been updated with the relevant details. Couldn't reproduce the error you mentioned though - the example worked as expected even with using the set_body() method (in FastAPI 0.108.0). Regardless, since the issue has now been fixed, there is no longer need for calling the set_body() method.Atcliffe
R
3

You may try to customize APIRouter like in FastAPI official documentation:

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute


class TimedRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            response: Response = await original_route_handler(request)
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            print(f"route duration: {duration}")
            print(f"route response: {response}")
            print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=TimedRoute)


@app.get("/")
async def not_timed():
    return {"message": "Not timed"}


@router.get("/timed")
async def timed():
    return {"message": "It's the time of my life"}


app.include_router(router)
Rms answered 15/1, 2022 at 11:15 Comment(0)
S
1

As the other answers did not work for me and I searched quite extensively on stackoverflow to fix this problem, I will show my solution below.

The main issue is that when using the request body or response body many of the approaches/solutions offered online do simply not work as the request/response body is consumed in reading it from the stream.

To solve this issue I adapted an approach that basically reconstructs the request and response after reading them. This is heavily based on the comment by user 'kovalevvlad' on https://github.com/encode/starlette/issues/495.

Custom middleware is created that is later added to the app to log all requests and responses. Note that you need some kind of logger to store your logs.

from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message

# Set up your custom logger here
logger = ""

class RequestWithBody(Request):
    """Creation of new request with body"""
    def __init__(self, scope: Scope, body: bytes) -> None:
        super().__init__(scope, self._receive)
        self._body = body
        self._body_returned = False

    async def _receive(self) -> Message:
        if self._body_returned:
            return {"type": "http.disconnect"}
        else:
            self._body_returned = True
            return {"type": "http.request", "body": self._body, "more_body": False}


class CustomLoggingMiddleware(BaseHTTPMiddleware):
    """
    Use of custom middleware since reading the request body and the response consumes the bytestream.
    Hence this approach to basically generate a new request/response when we read the attributes for logging.
    """
    async def dispatch(  # type: ignore
        self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
    ) -> Response:
            # Store request body in a variable and generate new request as it is consumed.
            request_body_bytes = await request.body()
        
        request_with_body = RequestWithBody(request.scope, request_body_bytes)

        # Store response body in a variable and generate new response as it is consumed.
        response = await call_next(request_with_body)
        response_content_bytes, response_headers, response_status = await self._get_response_params(response)

        # Logging

        # If there is no request body handle exception, otherwise convert bytes to JSON.
        try:
            req_body = json.loads(request_body_bytes)
        except JSONDecodeError:
            req_body = ""
        # Logging of relevant variables.
        logger.info(
            f"{request.method} request to {request.url} metadata\n"
            f"\tStatus_code: {response.status_code}\n"
            f"\tRequest_Body: {req_body}\n"
        )
        # Finally, return the newly instantiated response values
        return Response(response_content_bytes, response_status, response_headers)

async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
    """Getting the response parameters of a response and create a new response."""
    response_byte_chunks: List[bytes] = []
    response_status: List[int] = []
    response_headers: List[Dict[str, str]] = []

    async def send(message: Message) -> None:
        if message["type"] == "http.response.start":
            response_status.append(message["status"])
            response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
        else:
            response_byte_chunks.append(message["body"])

    await response.stream_response(send)
    content = b"".join(response_byte_chunks)
    return content, response_headers[0], response_status[0]
Sideshow answered 22/8, 2022 at 14:35 Comment(2)
Thanks Jan! You saved me a lot of effort.Lydialydian
As a note to this example, and the others - all sharing some similarities around recreating the original response: these all contain a bug where duplicate response headers (which are allowed per http spec, e.g. multiple set-cookie headers) will be removed, and only the last one will be kept. This happens due to the List[Tuple[bytes, bytes]] -> Dict[str, str] conversion.Brisance

© 2022 - 2024 — McMap. All rights reserved.