FastAPI `run_in_threadpool` getting stuck [duplicate]
Asked Answered
H

2

9

I have implemented all my routes using async. And have followed all guidelines from the FastAPI documentation.

Every route has multiple DB calls, which does not have async support, so they are normal function like this

def db_fetch(query):
    # I take a few seconds to respond
    return 

To avoid blocking my event loop I use fastapi.concurrancy.run_in_threadpool

Now the issue is, when a large number of requests come, my new requests are getting blocked. Even if I close the browser tab (cancel request), the entire app gets stuck till the older requests get processed.

What am I doing wrong here?

I use uvicorn as my ASGI server. I run in a kubernetes cluster with 2 replica.

Few suspects: Am I spawning too many threads? Is it some bug within uvicron? Not really sure!

Housewarming answered 31/1, 2022 at 14:52 Comment(6)
It could really be the number of threads. You could try to create a dedicated ThreadPool with a defined number of threads, and after that use asyncio run_in_executor using the predefined exexutor.Lindeman
Makes sense. I was going for that initially, but then I was suggested to use fastapi's run_in_threadpool as it uses AnyIO to manage threads, and takes care of housekeeping.Housewarming
That is what fastapi or starlett use under the hood anyio.readthedocs.io/en/stable/threads.html and they explicitly point that too many threads might be an issueLindeman
If I create a dedicated threadpool, won't it cause the same issue?Housewarming
I think you can limit the number of worker threads docs.python.org/3/library/… So if you define that pool once as a global var and use it, I think you can achieve this limit.Lindeman
Does this answer your question? FastAPI runs api-calls in serial instead of parallel fashionMarbling
L
10

It is as you've said an issue with too many threads. Under the hood, fastapi uses starlette which in turn uses anyio's to_thread.run_sync. As described here, too many threads could lead to an issue and you could shield them using a semaphore to set an upper bound on the maximum threads created. In code, that would read roughly like

# Core Library
from typing import TypeVar, Callable
from typing_extensions import ParamSpec
# Third party
from anyio import Semaphore
from starlette.concurrency import run_in_threadpool

# To not have too many threads running (which could happen on too many concurrent
# requests, we limit it with a semaphore.
MAX_CONCURRENT_THREADS = 10
MAX_THREADS_GUARD = Semaphore(MAX_CONCURRENT_THREADS)
T = TypeVar("T")
P = ParamSpec("P")


async def run_async(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
    async with MAX_THREADS_GUARD:
        return await run_in_threadpool(func, *args, **kwargs)
Lindeman answered 31/1, 2022 at 16:9 Comment(1)
After adding this one, what would happen to my other requests? Now that threads are restricted, they'd all be in waiting state right? Or does starlette spawn a new threadpool?Housewarming
C
-1
import threading
import time

from anyio.to_thread import current_default_thread_limiter
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool

app = FastAPI()
current_default_thread_limiter().total_tokens = 1000


def test():
    print(threading.current_thread().native_id)
    time.sleep(10)
    return 'ok'


@app.get("/")
async def read_root():
    result = await run_in_threadpool(func=test)
    print(result)
    return {"Hello": "World"}

Camera answered 6/9, 2023 at 16:51 Comment(1)
Thank you for contributing to the Stack Overflow community. This may be a correct answer, but it’d be really useful to provide additional explanation of your code so developers can understand your reasoning. This is especially useful for new developers who aren’t as familiar with the syntax or struggling to understand the concepts. Would you kindly edit your answer to include additional details for the benefit of the community?Glauce

© 2022 - 2024 — McMap. All rights reserved.