What is the proper way to make downstream Https requests inside of Uvicorn/FastAPI?
Asked Answered
A

1

5

I have an API endpoint (FastAPI / Uvicorn). Among other things, it makes a request to yet another API for information. When I load my API with multiple concurrent requests, I begin to receive the following error:

h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE

In a normal environment, I would take advantage of request.session, but I understand it not to be fully thread safe.

Thus, what is the proper approach to using requests within a framework such as FastAPI, where multiple threads would be using the requests library at the same time?

Ahmednagar answered 14/9, 2022 at 18:39 Comment(0)
C
12

Instead of using requests, you could use httpx, which offers an async API as well (httpx is also suggested in FastAPI's documentation when performing async tests, as well as FastAPI/Starlette recently replaced the HTTP client on TestClient from requests to httpx).

The below example is based on the one given in httpx documentation, demonstrating how to use the library for making an asynchronous HTTP(s) request, and subsequently, streaming the response back to the client. The httpx.AsyncClient() is what you can use instead of requests.Session(), which is useful when several requests are being made to the same host, as the underlying TCP connection will be reused, instead of recreating one for every single request—hence, resulting in a significant performance improvement. Additionally, it allows you to reuse headers and other settings (such as proxies and timeout), as well as persist cookies, across requests. You spawn a Client and reuse it every time you need it. You can use await client.aclose() to explicitly close the client once you are done with it (you could do that inside a shutdown event handler). Examples and more details can also be found in this answer.

Example

from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
import httpx


app = FastAPI()


@app.on_event("startup")
async def startup_event():
    app.state.client = httpx.AsyncClient()


@app.on_event('shutdown')
async def shutdown_event():
    await app.state.client.aclose()


@app.get('/')
async def home():
    client = app.state.client
    req = client.build_request('GET', 'https://www.example.com/')
    r = await client.send(req, stream=True)
    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))

Example (Updated)

Since startup and shutdown have now been deprecated (and might be completely removed in the future), you could instead use a lifespan handler to initialise the httpx Client, as well as close the Client instance on shutdown, similar to what has been demonstrated in this answer. Starlette specifically provides an example using a lifespan handler and httpx Client in their documentation page. As described in Starlette's documentation:

The lifespan has the concept of state, which is a dictionary that can be used to share the objects between the lifespan, and the requests.

The state received on the requests is a shallow copy of the state received on the lifespan handler.

Hence, objects added to the state in the lifespan handler can be accessed inside endpoints using request.state. The example below uses a streaming response to both communicate with the external server, as well as send the response back to the client. See here for more details on the async response streaming methods of httpx (i.e., aiter_bytes(), aiter_text(), aiter_lines(), etc.).

If you would like to use a media_type for the StreamingResponse, you could use the one from the original response like this: media_type=r.headers['content-type']. However, as described in this answer, you need to make sure that the media_type is not set to text/plain; otherwise, the content would not stream as expected in the browser, unless you disable MIME Sniffing (have a look at the linked answer for more details and solutions).

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialise the Client on startup and add it to the state
    async with httpx.AsyncClient() as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req, stream=True)
    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose)) 

If, for any reason, you need to read the content chunk by chunk on server side before responding back to the client, you could do this as follows:

@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req, stream=True)
    
    async def gen():
        async for chunk in r.aiter_raw():
            yield chunk
        await r.aclose()
        
    return StreamingResponse(gen())

If you don't want to use a streaming response, but rather have httpx reading the response for you in the first place (which would store the response data to the server's RAM; hence, you should make sure there is enough space available to accommodate the data), you could use the following. Note that using r.json() should only apply to cases where the response data are in JSON format; otherwise, you could return a PlainTextResponse or a custom Response directly, as demonstrated below.

from fastapi import Response
from fastapi.responses import PlainTextResponse

@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req)
    content_type = r.headers.get('content-type')
    
    if content_type == 'application/json':
        return r.json()
    elif content_type == 'text/plain':
        return PlainTextResponse(content=r.text)
    else:
        return Response(content=r.content) 

Using the async API of httpx would mean that you have to define your endpoints with async def; otherwise, you would have to use the standard synchronous API (for def vs async def see this answer), and as described in this github discussion:

Yes. HTTPX is intended to be thread-safe, and yes, a single client-instance across all threads will do better in terms of connection pooling, than using an instance-per-thread.

You can also control the connection pool size using the limits keyword argument on the Client (see Pool limit configuration). For example:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)
Constantia answered 15/9, 2022 at 18:56 Comment(5)
Thank you for the advice. I will try it out right away. If it does the trick, I'll mark this as the answer.Ahmednagar
No, sorry -- been a really busy week -- still trying to get to implementation.Ahmednagar
Thank you for your detailed answer! I have understood "startup-shutdown" event, and from fastapi's tutorial, I can realize that fastapi put objects that should be shared during lifespan in a dict ml_models, when you need to use it, you can just get it from the dict. But I can't quite understand your example in lifespan about yielding a dict and getting client from response.state. I read the document of starllete but it is almost the same as your example, is there any more detailed explanation about this?Lockman
actually what I'd like to ask is, if I don't use request as parameter, instead I use query or body, how to use lifespan as your example? or I can only use lifespan as fastapi's example to use a dict?Lockman
@ChuangMen The request parameter can be used along with any other parameterConstantia

© 2022 - 2024 — McMap. All rights reserved.