"future cannot be sent between threads safely" when pass Arc<Mutex> into tokio::spawn
Asked Answered
R

2

6

I implemented TCP client using tokio. However, my code not compile because I got an error:

error: future cannot be sent between threads safely
   --> src/main.rs:81:9
    |
81  |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Option<tokio::net::TcpStream>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:90:42
    |
82  |             match stream.lock().unwrap().as_mut() {
    |                   ---------------------- has type `std::sync::MutexGuard<'_, Option<tokio::net::TcpStream>>` which is not `Send`
...
90  |                     stream.write(&packet).await.unwrap();
    |                                          ^^^^^^ await occurs here, with `stream.lock().unwrap()` maybe used later
...
94  |             };
    |              - `stream.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:82:19
    |
82  |             match stream.lock().unwrap().as_mut() {
    |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: required by a bound in `tokio::spawn`
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

This is my code where issue occurs:

async fn handle_write(&mut self) -> JoinHandle<()> {
    let stream = Arc::clone(&self.stream);
    let session = Arc::clone(&self.session);
    let queue = Arc::clone(&self.queue);

    tokio::spawn(async move {
        match stream.lock().unwrap().as_mut() {
            Some(stream) => {
                let packet: Vec<u8> = queue.lock().unwrap().pop_front().unwrap();
                let packet = match session.lock().unwrap().header_crypt.as_mut() {
                    Some(header_crypt) => header_crypt.encrypt(&packet),
                    _ => packet,
                };

                stream.write(&packet).await.unwrap();
                stream.flush().await.unwrap();
            },
            _ => {},
        };
    })
}

and same issue here:

async fn handle_read(&mut self) -> JoinHandle<()> {
    let queue = Arc::clone(&self.queue);
    let stream = Arc::clone(&self.stream);
    let session = Arc::clone(&self.session);

    tokio::spawn(async move {
        match stream.lock().unwrap().as_mut() {
            Some(stream) => {
                let mut buffer = [0u8; 4096];

                match stream.read(&mut buffer).await {
                    Ok(bytes_count) => {
                        let raw_data = match session.lock().unwrap().header_crypt.as_mut() {
                            Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
                            _ => buffer[..bytes_count].to_vec(),
                        };

                        queue.lock().unwrap().push_back(raw_data);
                    },
                    _ => {},
                };
            },
            _ => {},
        };
    })
}

Playground.

Could somebody explain, what am I doing wrong ?

P.S. just in case: I am using std::sync::{Arc, Mutex};

Rafa answered 14/6, 2022 at 15:27 Comment(8)
You aren't sending the mutex, but the lock guard -- and this clearly can't work, std::Mutex's invariant is that its lock owned by a single thread.Spectrophotometer
@MarkoTopolnik you mean I should use use tokio::sync::{Mutex}; instead ?Rafa
@SergioIvanuzzo for instance yes. The problem you have is that tokio can resume coroutines on different threads than it paused them, so you can't keep std locks across await when using tokio::spawn. Alternatively you can use spawn_local which runs entirely on the scheduler where it came from.Shelving
@Shelving is it good practice to call tokio::spawn_local inside tokio::spawn ?Rafa
@SergioIvanuzzo noShelving
Although there are workarounds, of sort.Shelving
@Shelving I need handle_read, handle_write and handle_queue to be processed independently in different threads. Could you advice what I can do to achieve this ?Rafa
Fixed: play.rust-lang.org/…Rafa
R
6

Finally, as decided in comments to my question, I used tokio::sync::Mutex instead of std::sync::Mutex. So, now code compiles correctly.

Playground.

Rafa answered 14/6, 2022 at 16:26 Comment(1)
This was very simple solution. std::sync::MutexGuard<'_, Option<tokio::net::TcpStream>> which is not Send was telling the same thing. If someone is not using the mutex inside the Arc then unsafe implementations of Send and Sync would suffice.Ori
V
0

In my case the problem was using ThreadRng with thread_rng() which is NOT thread safe. Just as a heads-up for anyone else banging their head against this error message. I refactored to using let mut rng = ::rand::rngs::StdRng::from_seed(OsRng.gen());

Vitebsk answered 5/10, 2022 at 12:11 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.