How do I add tasks to a Tokio event loop that is running on another thread?
Asked Answered
E

1

6

I'd like to spin up a Tokio event loop alongside a Rocket server, then add events to this loop later on. I read Is there a way to launch a tokio::Delay on a new thread to allow the main loop to continue?, but it's still not clear to me how to achieve my goal.

Euh answered 10/2, 2019 at 3:6 Comment(0)
E
7

As the documentation states:

The returned handle can be used to spawn tasks that run on this runtime, and can be cloned to allow moving the Handle to other threads.

Here is an example of spinning up the event loop in one thread and having a second thread spawn tasks on it.

use futures::future; // 0.3.5
use std::{thread, time::Duration};
use tokio::{runtime::Runtime, time}; // 0.2.21

fn main() {
    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
    let (handle_tx, handle_rx) = std::sync::mpsc::channel();

    let tokio_thread = thread::spawn(move || {
        let mut runtime = Runtime::new().expect("Unable to create the runtime");

        eprintln!("Runtime created");

        // Give a handle to the runtime to another thread.
        handle_tx
            .send(runtime.handle().clone())
            .expect("Unable to give runtime handle to another thread");

        // Continue running until notified to shutdown
        runtime.block_on(async {
            shutdown_rx.await.expect("Error on the shutdown channel");
        });

        eprintln!("Runtime finished");
    });

    let another_thread = thread::spawn(move || {
        let handle = handle_rx
            .recv()
            .expect("Could not get a handle to the other thread's runtime");

        eprintln!("Another thread created");

        let task_handles: Vec<_> = (0..10)
            .map(|value| {
                // Run this future in the other thread's runtime
                handle.spawn(async move {
                    eprintln!("Starting task for value {}", value);
                    time::delay_for(Duration::from_secs(2)).await;
                    eprintln!("Finishing task for value {}", value);
                })
            })
            .collect();

        // Finish all pending tasks
        handle.block_on(async move {
            future::join_all(task_handles).await;
        });

        eprintln!("Another thread finished");
    });

    another_thread.join().expect("Another thread panicked");

    shutdown_tx
        .send(())
        .expect("Unable to shutdown runtime thread");

    tokio_thread.join().expect("Tokio thread panicked");
}
Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Finishing task for value 0
Finishing task for value 5
Finishing task for value 4
Finishing task for value 3
Finishing task for value 9
Finishing task for value 2
Finishing task for value 1
Finishing task for value 7
Finishing task for value 8
Finishing task for value 6
Another thread finished
Runtime finished

The solution for Tokio 0.1 is available in the revision history of this post.

See also:

Euh answered 10/2, 2019 at 3:6 Comment(7)
The overhead for calling handle_rx.recv() should be relatively small, correct? Just thinking if I should use lazy_static or if dynamic access would be sufficient.Holder
Perhaps a larger issue for a general use case (including mine), how would one go about sharing the tx/rx channel between files? The obvious way is via a pub static, but lazy_static doesn't allow for destructuring a tuple, and the tuple's fields are private.Holder
@Holder Yes, I'd expect the single transfer of the Handle to be very lightweight. I don't understand your other question; "files" aren't a meaningful unit here. You need to provide each thread with a copy of the Handle somehow, and that's going to vary based on how you spin them up.Euh
Actually figured it out (sorta). What I ended up doing was creating a mutable static (Option<Handle>). It needs unsafe, but given the strict ordering of things, I don't see that as an issue. If the cloning becomes an issue, I'll certainly look into Rc and whatnot.Holder
@Holder using a static mut is almost guaranteed to introduce undefined behavior into your program and you shouldn’t use it. You seem to be making it more complicated than it really needs to be.Euh
this answer no longer compiles, i was told on irc that you can do this without channels with handles nowAltimeter
@JanusTroelsen updated. Neither answer requires channels for spawning the task. The channels are used to transfer the handle between threads and to tell the runtime when to exit. If you have a different organization, your code may not need either of those.Euh

© 2022 - 2024 — McMap. All rights reserved.