Messaging between two tokio runtimes inside separate threads
Asked Answered
P

1

12

I ran into the kind of a problem described in this question: How can I create a Tokio runtime inside another Tokio runtime without getting the error "Cannot start a runtime from within a runtime"? .

Some good rust crates doesn't have asynchronous executor. I decided to put all such libraries calls in one thread which is tolerant of such operations. Another thread should be able to send non-blicking messages using tokio::channel.

I have programmed a demo stand to test implementation options. Call tokio::spawn inside of each runtime is made in order to understand a little more detail in tokio runtimes and handlers - it is a part of a question.

The question. Please correct me if I misunderstand something further.
There are two tokio runtimes. Each is launched in its own thread. Call tokio::spawn inside first_runtime() spawns task on first runtime. Call tokio::spawn inside second_runtime() spawns task on second runtime. There is a tokio::channel between these two tasks. Call tx.send(...).await does not block sending thread if channel buffer is not full, even if receiving thread is blocked by thread::sleep() call.
Am I getting everything right? The output of this code tells me that I'm right, but I need confirmation of my reasoning.

use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, Receiver, channel}; // 1.12.0


#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn first_runtime(tx: Sender<String>) {
    thread::sleep(Duration::from_secs(1));
    println!("first thread woke up");
    tokio::spawn(async move {
        for msg_id in 0..10 {
            if let Err(e) = tx.send(format!("message {}", msg_id)).await {
                eprintln!("[ERR]: {}", e);
            } else {
                println!("message {} send", msg_id);
            }
        }
    }).await;
    println!("first thread finished");
}


#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn second_runtime(mut rx: Receiver<String>) {
    thread::sleep(Duration::from_secs(3));
    println!("second thread woke up");
    tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            println!("{} received", msg);
        }
    }).await;
    println!("second thread finished");
}


fn main() {
    let (tx, rx) = channel::<String>(5);
    thread::spawn(move || { first_runtime(tx); });
    second_runtime(rx);
}

Photodrama answered 27/10, 2021 at 9:23 Comment(1)
Looks good to me :), but I'm not an expert yet...Vasta
C
1

You are right

here is the output of the code that I got:

first thread woke up
message 0 send
message 1 send
message 2 send
message 3 send
message 4 send          <-- after this, the queue if full, cannot send anymore yet
second thread woke up
message 0 received
message 1 received
message 2 received
message 3 received
message 4 received
message 5 send          <-- can resume sending message after rx consum some messages
message 5 received
message 6 send
message 7 send
message 8 send
message 9 send
message 6 received
message 7 received
message 8 received
message 9 received
first thread finished
second thread finished

tokio::spawn

however you don't actually need to use tokio::spawn for this case.

since that you immediatly await the handle after it is spawn

use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::{channel, error::TrySendError, Receiver, Sender};

#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn first_runtime(tx: Sender<String>) {
    thread::sleep(Duration::from_secs(1));
    println!("first thread woke up");
    for msg_id in 0..10 {
        if let Err(e) = tx.send(format!("message {}", msg_id)).await {
            eprintln!("[ERR]: {}", e);
        } else {
            println!("message {} send", msg_id);
        }
    }
    println!("first thread finished");
}

#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn second_runtime(mut rx: Receiver<String>) {
    thread::sleep(Duration::from_secs(3));
    println!("second thread woke up");
    while let Some(msg) = rx.recv().await {
        println!("{} received", msg);
    }
    println!("second thread finished");
}

fn main() {
    let (tx, rx) = channel::<String>(5);
    thread::spawn(move || {
        first_runtime(tx);
    });
    second_runtime(rx);
}

|   for normal runtime
||  for tokio runtime
||* for tokio runtime on sleep
||- for tokio runtime on await

|main thread
|
|--------------------|  thread spawn
|                    || first runtime
|main thread         ||*
|| second runtime    ||*
||*                  ||*
||*   < message      ||
||*   < message      ||
||*   < message      ||
||*   < message      ||
||*   < message      ||
||*                  ||-
||*                  ||-
|| recive message    ||-
||    < message      ||
|| recive message    ||-
|| recive message    ||-
||    < message      ||
||    < message      ||
|| recive message    ||-
.
.
.

try_send

you can use try_send to see how the program really run

use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::{channel, error::TrySendError, Receiver, Sender};

#[tokio::main(worker_threads = 1)]
async fn first_runtime(tx: Sender<String>) {
    thread::sleep(Duration::from_secs(1));
    println!("first thread woke up");
    for msg_id in 0..10 {
        loop{
            match tx.try_send(format!("message {}", msg_id)){
                Ok(()) => {
                    println!("message {} send", msg_id);
                    break;
                },
                Err(TrySendError::Full(_)) => {
                    println!("The queue is full >.<");
                },
                Err(TrySendError::Closed(_)) => {
                    eprintln!("[ERR] Channel is closed");
                }
            }
            std::thread::sleep(std::time::Duration::from_millis(250));
        }
    }
    println!("first thread finished");
}

#[tokio::main(worker_threads = 1)]
async fn second_runtime(mut rx: Receiver<String>) {
    thread::sleep(Duration::from_secs(3));
    println!("second thread woke up");
    while let Some(msg) = rx.recv().await {
        println!("{} received", msg);
    }
    println!("second thread finished");
}

fn main() {
    let (tx, rx) = channel::<String>(5);
    thread::spawn(move || {
        first_runtime(tx);
    });
    second_runtime(rx);
}

Output

first thread woke up
message 0 send
message 1 send
message 2 send
message 3 send
message 4 send
The queue is full >.<
The queue is full >.<
The queue is full >.<
The queue is full >.<
The queue is full >.<
The queue is full >.<
The queue is full >.<
The queue is full >.<
second thread woke up
message 0 received
message 1 received
message 2 received
message 3 received
message 4 received
message 5 send
message 6 send
message 7 send
message 8 send
message 9 send
first thread finished
message 5 received
message 6 received
message 7 received
message 8 received
message 9 received
second thread finished
Cerebrovascular answered 20/8, 2024 at 9:26 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.