FastAPI is fully compatible with (and based on) Starlette, and hence, with FastAPI you get all of Starlette's features, such as the run_in_threadpool()
method. Starlette's run_in_threadpool()
, which uses anyio.to_thread.run_sync()
behind the scenes, "will run the sync blocking function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked"—see this answer and AnyIO's Working with threads documentation for more details. Calling run_in_threadpool()
—which internally calls anyio.to_thread.run_sync()
, and subsequently, AsyncIOBackend.run_sync_in_worker_thread()
—will return a coroutine that will then be await
ed to get the eventual result of the sync function (e.g., result = await run_in_threadpool(...)
), and hence, FastAPI will still work asynchronously (instead of calling that synchronous function directly, which would block the event loop that runs in the main thread, and hence, the main thread would get blocked as well). As can be seen in Starlette's source code (link is given above), the run_in_threadpool()
function simply looks like this (supporting both sequence and keyword arguments):
async def run_in_threadpool(
func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
if kwargs: # pragma: no cover
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args)
As described in AnyIO's documentation:
Adjusting the default maximum worker thread count
The default AnyIO worker thread limiter has a value of 40
, meaning
that any calls to to_thread.run_sync()
without an explicit limiter
argument will cause a maximum of 40
threads to be spawned. You can
adjust this limit like this:
from anyio import to_thread
async def foo():
# Set the maximum number of worker threads to 60
to_thread.current_default_thread_limiter().total_tokens = 60
Note
AnyIO’s default thread pool limiter does not affect the default thread pool executor on asyncio
.
Since FastAPI uses Startlette's concurrency
module to run blocking functions in an external threadpool (the same threadpool is also used by FastAPI to run endpoints defined with normal def
instead of async def
, as described in this answer), the default value of the thread limiter, as shown above, is applied here as well, i.e., 40
threads maximum—see the relevant AsyncIOBackend.current_default_thread_limiter()
method that returns the CapacityLimiter
with the default number of threads. Hence, sending 50
requests simultaneously, as in your case, would lead to threadpool starvation, meaning that there wouldn't be enough threads available in the threadpool to handle all incoming requests concurrently.
As described earlier, one can adjust that value, thus increasing the number of threads, which might lead to an improvement in performance results—always depending on the number of requests to def
endpoints (or async def
endpoints that create calls to run_in_threadpool()
inside) your API is expected to serve concurrently. For instance, if you expect the API to serve no more than 50 requests at a time to such endpoints, then set the maximum number of threads to 50. Note: If your FastAPI application also uses synchronous/blocking background tasks and/or StreamingResponse
's generators (i.e., functions defined with normal def
instead of async def
) and/or UploadFile
's async
methods, such as read
/write
/etc. (all these methods call the corresponding synchronous def
file methods underneath, using run_in_threadpool()
), you could then increase the number of threads as required, as FastAPI actually runs all those functions in the same external threadpool as well—it is all explained in this answer in details.
Note that using the approach below, which was described here, would have the same effect on adjusting the number of worker threads:
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
RunVar("_default_thread_limiter").set(CapacityLimiter(60))
But, it would be best to follow the approach provided by AnyIO's official documentation (as shown earlier). It is also a good idea to have this done when the application starts up, using a lifespan
event handler, as demonstrated here.
In the working example below, since the /sync
endpoint is defined with normal def
instead of async def
, FastAPI will run it in a separate thread from the external threadpool and await
it, thus ensuring the event loop (and hence, the main thread and the entire server) does not get blocked due to the blocking operations (either blocking IO-bound or CPU-bound) that will be performed inside that endpoint.
Working Example 1
from fastapi import FastAPI
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync")
def test_sync() -> None:
time.sleep(3)
print("sync")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
Using ApacheBench, you could test the example above as follows, which will send 1000
requests in total with 50
being sent simultaneously at a time (-n
: Number of requests, -c
: Number of concurrent requests):
ab -n 1000 -c 50 "http://localhost:8000/sync"
While running a performance test on the example above, if you call the /get_available_threads
endpoint from your browser, e.g., http://localhost:8000/get_available_threads
, you would see that the amount of threads available is always 10 or above (since only 50 threads are used at a time in this test, but the thread limiter was set to 60
), meaning that setting the maximum number of threads on AnyIO's thread limiter to a number that is well above your needs, like 200
as shown in some other answer and in your recent example, wouldn't bring about any improvements in the performance; on the contrary, you would end up with a number of threads "sitting" there without being used. As explained earlier, the number of maximum threads should depend on (1) the number of requests your API is expected to serve concurrently, (2) any additional blocking tasks/functions that would run in the threadpool by FastAPI itself under the hood, as well as (3) the server machine's resources available.
The example below is the same as the one above, but instead of letting FastAPI itself to handle the blocking operation(s) inside the def
endpoint (by running the def
endpoint in the external threadpool and await
ing it), the endpoint is now defined with async def
(meaning that FastAPI will run it directly in the event loop), and inside the endpoint, run_in_threadpool()
(which returns an await
able) is used to run the blocking operation (i.e., time.sleep()
in the example). Performing a benchmark test on the example below would yield similar results to the previous example.
Working Example 2
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async_run_in_tp")
async def test_sync_async_with_run_in_threadpool() -> None:
await run_in_threadpool(time.sleep, 3)
print("sync_async using FastAPI's run_in_threadpool")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
Using ApacheBench, you could test the example above as follows:
ab -n 1000 -c 50 "http://localhost:8000/sync_async_run_in_tp"
When using asyncio
's loop.run_in_executor()
—after obtaining the running event loop using asyncio.get_running_loop()
—one could pass None
to the executor
argument, which would lead to the default executor being used; that is, a ThreadPoolExecutor
. Note that when calling loop.run_in_executor()
and passing None
to the executor
argument, this does not create a new instance of a ThreadPoolExecutor
every time you do that; instead, a ThreadPoolExecutor
is only initialised once the first time you do that, but for subsequent calls to loop.run_in_executor()
with passing None
to the executor
argument, Python reuses that very same instance of ThreadPoolExecutor
(hence, the default executor). This can been seen in the source code of loop.run_in_executor()
. That means, the number of threads that can be created, when calling await loop.run_in_executor(None, ...)
, is limited to the default number of thread workers in the ThreadPoolExecutor
class.
As described in the documentation of ThreadPoolExecutor
—and as shown in its implementation here—by default, the max_workers
argument is set to None
, in which case, the number of worker threads is set based on the following equation: min(32, os.cpu_count() + 4)
. The os.cpu_count()
function reutrns the number of logical CPUs in the current system. As explained in this article, physical cores refers to the number of CPU cores provided in the hardware (e.g., the chips), while logical cores is the number of CPU cores after hyperthreading is taken into account. If, for instance, your machine has 4 physical cores, each with hyperthreading (most modern CPUs have this), then Python will see 8 CPUs and will allocate 12 threads (8 CPUs + 4) to the pool by default (Python limits the number of threads to 32 to "avoid consuming surprisingly large resources on multi-core machines"; however, one could always adjust the max_workers
argument on their own when using a custom ThreadPoolExecutor
, instead of using the default one). You could check the default number of worker threads on your system as follows:
import concurrent.futures
# create a thread pool with the default number of worker threads
pool = concurrent.futures.ThreadPoolExecutor()
# report the number of worker threads chosen by default
# Note: `_max_workers` is a protected variable and may change in the future
print(pool._max_workers)
Now, as shown in your original example, you are not using a custom ThreadPoolExecutor
, but instead using the default ThreadPoolExecutor
every time a request arrives, by calling await loop.run_in_executor(None, time.sleep, 3)
(inside the sync_async_func()
function, which is triggered by the /test/sync_async
endpoint). Assuming your machine has 4 physical cores with hyperthreading enabled (as explained in the example earlier), then the default number of worker threads for the default ThreadPoolExecutor
would be 12. That means, based on your original example and the /test/sync_async
endpoint that triggers the await loop.run_in_executor(None, time.sleep, 3)
function, your application could only handle 12 concurrent requests at a time. That is the main reason for the difference observed in the performance results when compared to using run_in_threadpool()
, which comes with 40
allocated threads by default. Even though, in both cases, a threadpool starvation was caused when sending 50
requests simultaneously, the endpoint (in your example) that uses run_in_threadpool()
performed better only because the default number of threads created was greater than the one used by the default ThreadPoolExecutor
(in your other endpoint).
One way to solve this is to create a new instance of ThreadPoolExecutor
(on your own, instead of using the default executor) every time a request arrives and have it terminated once the task is completed (using the with
statement), as shown below:
import concurrent.futures
import asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
await loop.run_in_executor(pool, time.sleep, 3)
Although the above should wok just fine, it would be best to instantiate a ThreadPoolExecutor
once at application startup, adjust the number of worker threads as needed, and reuse the executor when required. Having said that, depending on the blocking task and/or external libraries you might be using for that task, if you ever encounter a memory leak—i.e., memory that is no longer needed, but is not released—after tasks are completed when reusing a ThreadPoolExecutor
, you might find creating a new instance of ThreadPoolExecutor
each time, as shown above, more suitable. Note, however, that if this was a ProcessPoolExecutor
instead, creating and destroying many processes over and over could become computationally expensive. Creating and destroying too many threads could consume huge memory as well.
Below is a complete working example, demonstrating how to create a reusable custom ThreadPoolExecutor
. Calling the /get_active_threads
endpoint from your browser, e.g., http://localhost:8000/get_active_threads
, while running a performance test with ApacheBench (using 50
concurrent requests, as described in your question and as shown below), you would see that the number of active threads never goes above 51
(50 concurrent threads + 1, which is the main thread), despite setting the max_workers
argument to 60
in the example below. This is simply because, in this performance test, the application is never required to serve more than 50
requests at the same time. Also, ThreadPoolExecutor
won't spawn new threads, if idle threads are available (thus, saving resources)—see the relevant implementation part. Hence, again, initialising the ThreadPoolExecutor
with max_workers=100
, as shown in your recent update, would be unecessary, if you never expect your FastAPI application to serve more than 50 requests at a time (to endpoints where that ThreadPoolExecutor
is used).
Working Example
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import concurrent.futures
import threading
import asyncio
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
pool = concurrent.futures.ThreadPoolExecutor(max_workers=60)
yield {'pool': pool}
pool.shutdown()
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async")
async def test_sync_async(request: Request) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(request.state.pool, time.sleep, 3)
print("sync_async")
@app.get('/get_active_threads')
async def get_active_threads():
return threading.active_count()
Using ApacheBench, you could test the example above as follows:
ab -n 1000 -c 50 "http://localhost:8000/sync_async"
Final Notes
In general, you should always aim to use asynchronous code (i.e., using async
/await
), wherever is possible, as async
code, also known as coroutines, runs directly in the event loop—the event loop runs in the main thread and executes all tasks in that thread. That means there is only one thread that can take a lock on the interpreter; thus, avoiding the additional overhead of context switching (i.e., the CPU jumping from one thread of execution to another). When dealing with sync blocking IO-bound tasks though, you could either (1) define your endpoint with def
and let FastAPI handle it behind the scenes as described earlier, as well as in this answer, or (2) define your endpoint with async def
and use run_in_threadpool()
on your own to run that blocking task in a separate thread and await
it, or (3) define your endpoint with async def
and use asyncio
's loop.run_in_executor()
with a custom (preferably reusable) ThreadPoolExecutor
, adjusting the number of worker threads as required. When required to perform blocking CPU-bound tasks, while running such tasks in a separate from an external threadpool and await
ing them would successfully prevent the event loop from getting blocked, it wouldn't, however, provide the performance improvement you would expect from running code in parallel. Thus, for CPU-bound tasks, one may choose to use a ProcessPoolExecutor
instead (Note: when using processes in general, you need to explicitly protect the entry point with if __name__ == '__main__'
)—example on using a ProcessPoolExecutor
can be found in this answer.
To run tasks in the background, without waiting for them to complete in order to proceed with executing the rest of the code in an endpoint, you could use FastAPI's BackgroundTasks
, as shown here and here. If the background task function is defined with async def
, FastAPI will run it directly in the event loop, whereas if it is defined with normal def
, FastAPI will use run_in_threadpool()
and await
the returned coroutine (same concept as API endpoints). Another option when you need to run an async def
function in the background, but not necessarily having it trigerred after returning a FastAPI response (which is the case in BackgroundTasks
), is to use asyncio.create_task()
, as shown in this answer and this answer. If you need to perform heavy background computation and you don't necessarily need it to be run by the same process, you may benefit from using other bigger tools such as Celery.
Finally, regarding the optimal/maximum number of worker threads, I would suggest reading this article (have a look at this article as well for more details on ThreadPoolExecutor
in general). As explained in the article:
It is important to limit the number of worker threads in the thread
pools to the number of asynchronous tasks you wish to complete, based
on the resources in your system, or on the number of resources you
intend to use within your tasks.
Alternately, you may wish to increase the number of worker threads
dramatically, given the greater capacity in the resources you intend
to use.
[...]
It is common to have more threads than CPUs (physical or logical) in
your system. The reason for this is that threads are used for IO-bound tasks, not
CPU-bound tasks. This means that threads are used for tasks that wait
for relatively slow resources to respond, like hard drives, DVD
drives, printers, network connections, and much more.
Therefore, it is not uncommon to have tens, hundreds and even
thousands of threads in your application, depending on your specific
needs. It is unusual to have more than one or a few thousand threads.
If you require this many threads, then alternative solutions may be
preferred, such as AsyncIO
.
Also, in the same article:
Does the Number of Threads in the ThreadPoolExecutor
Match the Number of CPUs or Cores?
The number of worker threads in the ThreadPoolExecutor
is not
related to the number of CPUs or CPU cores in your system.
You can configure the number of threads based on the number of
tasks you need to execute, the amount of local system resources you
have available (e.g., memory), and the limitations of resources you
intend to access within your tasks (e.g., connections to remote
servers).
How Many Threads Should I Use?
If you have hundreds of tasks, you should probably set the number of
threads to be equal to the number of tasks.
If you have thousands of tasks, you should probably cap the number of
threads at hundreds or 1,000.
If your application is intended to be executed multiple times in the
future, you can test different numbers of threads and compare overall
execution time, then choose a number of threads that gives
approximately the best performance. You may want to mock the task in
these tests with a random sleep operation.
What Is the Maximum Number of Worker Threads in the ThreadPoolExecutor
?
There is no maximum number of worker threads in the
ThreadPoolExecutor
.
Nevertheless, your system will have an upper limit of the number of
threads you can create based on how much main memory (RAM) you have
available.
Before you exceed main memory, you will reach a point of diminishing
returns in terms of adding new threads and executing more tasks. This
is because your operating system must switch between the threads,
called context switching. With too many threads active at once, your
program may spend more time context switching than actually executing
tasks.
A sensible upper limit for many applications is hundreds of threads to
perhaps a few thousand threads. More than a few thousand threads on a
modern system may result in too much context switching, depending on
your system and on the types of tasks that are being executed.