I'd like to spin up a Tokio event loop alongside a Rocket server, then add events to this loop later on. I read Is there a way to launch a tokio::Delay on a new thread to allow the main loop to continue?, but it's still not clear to me how to achieve my goal.
How do I add tasks to a Tokio event loop that is running on another thread?
Asked Answered
The returned handle can be used to spawn tasks that run on this runtime, and can be cloned to allow moving the
Handle
to other threads.
Here is an example of spinning up the event loop in one thread and having a second thread spawn tasks on it.
use futures::future; // 0.3.5
use std::{thread, time::Duration};
use tokio::{runtime::Runtime, time}; // 0.2.21
fn main() {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let (handle_tx, handle_rx) = std::sync::mpsc::channel();
let tokio_thread = thread::spawn(move || {
let mut runtime = Runtime::new().expect("Unable to create the runtime");
eprintln!("Runtime created");
// Give a handle to the runtime to another thread.
handle_tx
.send(runtime.handle().clone())
.expect("Unable to give runtime handle to another thread");
// Continue running until notified to shutdown
runtime.block_on(async {
shutdown_rx.await.expect("Error on the shutdown channel");
});
eprintln!("Runtime finished");
});
let another_thread = thread::spawn(move || {
let handle = handle_rx
.recv()
.expect("Could not get a handle to the other thread's runtime");
eprintln!("Another thread created");
let task_handles: Vec<_> = (0..10)
.map(|value| {
// Run this future in the other thread's runtime
handle.spawn(async move {
eprintln!("Starting task for value {}", value);
time::delay_for(Duration::from_secs(2)).await;
eprintln!("Finishing task for value {}", value);
})
})
.collect();
// Finish all pending tasks
handle.block_on(async move {
future::join_all(task_handles).await;
});
eprintln!("Another thread finished");
});
another_thread.join().expect("Another thread panicked");
shutdown_tx
.send(())
.expect("Unable to shutdown runtime thread");
tokio_thread.join().expect("Tokio thread panicked");
}
Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Finishing task for value 0
Finishing task for value 5
Finishing task for value 4
Finishing task for value 3
Finishing task for value 9
Finishing task for value 2
Finishing task for value 1
Finishing task for value 7
Finishing task for value 8
Finishing task for value 6
Another thread finished
Runtime finished
The solution for Tokio 0.1 is available in the revision history of this post.
See also:
Perhaps a larger issue for a general use case (including mine), how would one go about sharing the tx/rx channel between files? The obvious way is via a
pub static
, but lazy_static
doesn't allow for destructuring a tuple, and the tuple's fields are private. –
Holder @Holder Yes, I'd expect the single transfer of the
Handle
to be very lightweight. I don't understand your other question; "files" aren't a meaningful unit here. You need to provide each thread with a copy of the Handle
somehow, and that's going to vary based on how you spin them up. –
Euh Actually figured it out (sorta). What I ended up doing was creating a mutable static (
Option<Handle>
). It needs unsafe
, but given the strict ordering of things, I don't see that as an issue. If the cloning becomes an issue, I'll certainly look into Rc
and whatnot. –
Holder @Holder using a
static mut
is almost guaranteed to introduce undefined behavior into your program and you shouldn’t use it. You seem to be making it more complicated than it really needs to be. –
Euh this answer no longer compiles, i was told on irc that you can do this without channels with handles now –
Altimeter
@JanusTroelsen updated. Neither answer requires channels for spawning the task. The channels are used to transfer the handle between threads and to tell the runtime when to exit. If you have a different organization, your code may not need either of those. –
Euh
© 2022 - 2024 — McMap. All rights reserved.
handle_rx.recv()
should be relatively small, correct? Just thinking if I should uselazy_static
or if dynamic access would be sufficient. – Holder