How to create a dedicated threadpool for CPU-intensive work in Tokio?
Asked Answered
P

4

16

I have a Rust async server based on the Tokio runtime. It has to process a mix of latency-sensitive I/O-bound requests, and heavy CPU-bound requests.

I don't want to let the CPU-bound tasks monopolize the Tokio runtime and starve the I/O bound tasks, so I'd like to offload the CPU-bound tasks to a dedicated, isolated threadpool (isolation is the key here, so spawn_blocking/block_in_place on one shared threadpool are insufficient). How can I create such a threadpool in Tokio?

A naive approach of starting two runtimes runs into an error:

thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.'

use tokio; // 0.2.20

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();
    let cpu_pool = cpu_pool.handle().clone(); // this is the fix/workaround!

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}

Can Tokio allow two separate runtimes? Is there a better way to create an isolated CPU pool in Tokio?

Panter answered 12/5, 2020 at 13:25 Comment(3)
I'm aware of block_in_place, but it doesn't give isolation guarantee I'm looking for.Panter
isolation guarantee I'm looking for and what guarantee is that? Please edit your question to state all of your requirements.Dillion
@Dillion I think it's already there: "I don't want to let the CPU-bound task monopolize the Tokio runtime and starve the I/O bound tasks."Bicapsular
R
16

While Tokio already has a threadpool, the documentation of Tokio advises:

If your code is CPU-bound and you wish to limit the number of threads used to run it, you should run it on another thread pool such as rayon. You can use an oneshot channel to send the result back to Tokio when the rayon task finishes.

So, if you want to create a threadpool to make heavy use of CPU, a good way is to use a crate like Rayon and send the result back to the Tokio task.

Randeerandel answered 12/5, 2020 at 13:47 Comment(3)
spawn_blocking likewise says: to run your CPU-bound computations on only a few threads, you should use a separate thread pool such as rayon rather than configuring the number of blocking threads.Dillion
Use of rayon requires sending results back via async channels, which is quite cumbersome ;(Panter
@Panter I don't think channel is cumbersome, but whatever what difference with tokio ? I think tokio spawn to something quite similar. You need a way to send back the result. Don't forget that the point of multi threading is to use more CPU to do more work, but the overhead exist. The point is that people should only use multithreading if this overheat is worse the gain with multi threading.Randeerandel
P
8

Tokio's error message was misleading. The problem was due to Runtime object being dropped in an async context.

The workaround is to use Handle, not Runtime directly, for spawning tasks on the other runtime.

fn main() {
    let mut main_runtime = tokio::runtime::Runtime::new().unwrap();
    let cpu_pool = tokio::runtime::Builder::new().threaded_scheduler().build().unwrap();

    // this is the fix/workaround:
    let cpu_pool = cpu_pool.handle().clone(); 

    main_runtime.block_on(main_runtime.spawn(async move {
        cpu_pool.spawn(async {}).await
    }))
    .unwrap().unwrap();
}
Panter answered 12/5, 2020 at 22:23 Comment(1)
I fail to see the point to create another runtime in your context.Randeerandel
D
6

Starting a Tokio runtime already creates a threadpool. The relevant options are

Roughly speaking, core_threads controls how many threads will be used to process asynchronous code. max_threads - core_threads is how many threads will be used for blocking work (emphasis mine):

Otherwise as core_threads are always active, it limits additional threads (e.g. for blocking annotations) as max_threads - core_threads.

You can also specify these options through the tokio::main attribute.

You can then annotate blocking code with either of:

See also:

spawn_blocking can easily take all of the threads available in the one and only runtime, forcing other futures to wait on them

You can make use of techniques like a Semaphore to restrict maximum parallelism in this case.

Dillion answered 12/5, 2020 at 13:37 Comment(3)
This doesn't answer my question. I'm looking for a dedicated thread pool, which doesn't share threads with my main runtime. I'm already using block_in_place, but this ruins my latency and blocked threads starve network connections.Panter
@Panter please also explain why spawn_blocking doesn't accomplish the goal?Dillion
because I have many very long CPU-bound tasks, so spawn_blocking can easily take all of the threads available in the one and only runtime, forcing other futures to wait on them.Panter
L
0

One possibility not discussed here so far, is to create 2x threads in the main runtime, and use a semaphore for the CPU-bound tasks to ensure you don't start more than 1x CPU tasks concurrently. That leaves a guaranteed 1x threads to service non-blocking IO.

I would still use the default number of threads on a machine with hyper-threading, as you'll have num physical cores threads for each of IO and CPU bound tasks, which is probably optimal. But you can increase it if desired:

use tokio::runtime::Builder;
use tokio::sync::Semaphore;

// Half of the available threads used in the runtime
// Note: const_new requires the parking_lot feature.
static CPU_SEM: Semaphore = Semaphore::const_new(4);

fn main() {
    // build runtime with specified number of worker threads
    let runtime = Builder::new_multi_thread()
        .worker_threads(CPU_SEM.available_permits()*2)
        .build()
        .unwrap();

    runtime.block_on(runtime.spawn(async move {
       cpu_task().await        
    })).unwrap(); 
}

async fn cpu_task() {
    // The task will sleep until a permit is available
    let permit = CPU_SEM.acquire().await.unwrap();

    // Do CPU heavy work here, we won't starve the runtime

    // permit is dropped, letting another CPU task run
}

However, looking at the documentation, all the answers here, including this one seem dated. Tokio seems to support a separate blocking thread pool:

pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self

Specifies the limit for additional threads spawned by the Runtime.

These threads are used for blocking operations like tasks spawned through spawn_blocking. Unlike the worker_threads, they are not always active and will exit if left idle for too long. You can change this timeout duration with thread_keep_alive.

The default value is 512.

So now you can just use spawn_blocking.

Lewan answered 11/10, 2022 at 11:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.