Error on Future generator closure: Captured variable cannot escape `FnMut` closure body
Asked Answered
L

2

10

I want to create a simple websocket server. I want to process the incoming messages and send a response, but I get an error:

error: captured variable cannot escape `FnMut` closure body
  --> src\main.rs:32:27
   |
32 |       incoming.for_each(|m| async {
   |  _________________________-_^
   | |                         |
   | |                         inferred to be a `FnMut` closure
33 | |         match m {
34 | |             // Error here...
35 | |             Ok(message) => do_something(message, db, &mut outgoing).await,
36 | |             Err(e) => panic!(e)
37 | |         }
38 | |     }).await;
   | |_____^ returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

This gives a few hits on Stack Overflow but I don't see anywhere in my code where a variable is escaping. The async block won't run concurrently, so I don't see any problem. Furthermore, I feel like I am doing something very simple: I get a type which allows me to send data back to the client, but when using a reference to it in the async block, it gives a compile error. The error only occurs when I use the outgoing or db variable in the async code.

This is my code (error is in the handle_connection function):

main.rs

use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;

struct DatabaseConnection;

#[tokio::main]
async fn main() -> Result<(), ()> {
    listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}

async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
    let try_socket = TcpListener::bind(address).await;
    let mut listener = try_socket.expect("Failed to bind on address");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(stream, addr, db.clone()));
    }

    Ok(())
}

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
    let db = &*db;
    let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();

    let (mut outgoing, incoming) = ws_stream.split();

    // Adding 'move' does also not work
    incoming.for_each(|m| async {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e)
        }
    }).await;
}

async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
    // Do something...

    // Send some message
    let _ = outgoing.send(Message::Text("yay".to_string())).await;
}

Cargo.toml

[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"

When using async move, I get the following error:

code

incoming.for_each(|m| async move {
    let x = &mut outgoing;
    let b = db;
}).await;

error

error[E0507]: cannot move out of `outgoing`, a captured variable in an `FnMut` closure
  --> src\main.rs:33:38
   |
31 |       let (mut outgoing, incoming) = ws_stream.split();
   |            ------------ captured outer variable
32 | 
33 |       incoming.for_each(|m| async move {
   |  ______________________________________^
34 | |         let x = &mut outgoing;
   | |                      --------
   | |                      |
   | |                      move occurs because `outgoing` has type `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::tcp::stream::TcpStream>, tungstenite::protocol::message::Message>`, which does not implement the `Copy` trait
   | |                      move occurs due to use in generator
35 | |         let b = db;
36 | |     }).await;
   | |_____^ move out of `outgoing` occurs here
Latinist answered 24/6, 2020 at 14:18 Comment(18)
Which variable is the error message complaining about?Iman
@SolomonUcko It's this line which gives me an error, I don't know the variable since the error message does not say that: Ok(message) => do_something(message, db, &mut outgoing).await,Latinist
Maybe try making the closure move (... move |m| async ...)Iman
@SolomonUcko yes, that's what I saw in a similar question on SO. It didn't work :(Latinist
@Shepmaster ok, I added the compile error I got from running cargo runLatinist
Does this highlights/solves your problem? #60646123Coddle
I wonder if the issue is from returning the result of do_something(...).await where do_something is passed references...Iman
What happens if, in addition to the move keyword, you also move let db = &*db; to inside the closure?Iman
@ÖmerErden I don't see any highlights when clicking the provided link. I am having a hard time understanding the code (I don't code long in Rust). I am not sure how I can implement the answer in my code to make it work. Do I need some variables to wrap inside a Arc/Mutex?Latinist
@SolomonUcko Still does not work, tried it with this code: incoming.for_each(move |m| async { let x = db;let y = &mut outgoing;}).await;Latinist
Playground demo: play.rust-lang.org/…. Are you getting both of these error messages?Iman
You can also try changing async to async move.Iman
Try |m| async move and returning something other than do_something(...).await (e.g. panic!(...)), and make sure that compiles. play.rust-lang.org/…Iman
From the @SolomonUcko 's link play.rust-lang.org/… , you can use fold instead for_each to keep outgoing in hand, this will fix the compile errors but i don't know if there is any logical error. I've explained this problem in the link that i've shared at comment above.Coddle
@SolomonUcko When using this code: incoming.for_each(|m| async move {let x = &mut outgoing;let b = db;}).await;, I get a more specific error, which I added in the questionLatinist
@ÖmerErden I checked out fold, but I get the same compile error: incoming.fold(0, |i, m| async move {let x = &mut outgoing;let b = db;i}).await;Latinist
@Latinist Why do you use like that Please check the fold solution in this link: play.rust-lang.org/… (which i've shared before)Coddle
@ÖmerErden Thanks a million times!! I got it now working, and now I understand your post better and fold in general. If you post a short answer describing I should use fold, rather than for_each, I would be happy to accept it :). Thanks again (took a few frustating hours of my life).Latinist
C
17

FnMut is an anonymous struct, since FnMutcaptured the &mut outgoing, it becomes a field inside of this anonymous struct and this field will be used on each call of FnMut , it can be called multiple times. If you lose it somehow (by returning or moving into another scope etc...) your program will not able to use that field for further calls, due to safety Rust Compiler doesn't let you do this(for your both case).

In your case instead of capturing the &mut outgoing we can use it as argument for each call, with this we'll keep the ownership of outgoing. You can do this by using fold from futures-rs:

incoming
    .fold(outgoing, |mut outgoing, m| async move {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }

        outgoing
    })
    .await;

This may seem a bit tricky but it does the job, we are using constant accumulator(outgoing) which will be used as an argument for our FnMut.

Playground (Thanks @Solomon Ucko for creating reproducible example)

See also :

Curtilage answered 24/6, 2020 at 20:25 Comment(0)
J
1

Another way seems work for me is to wrapper it with Mutex:

let outgoing = Mutex::new(outgoing);
incoming
    .foreach(|m| async move {
        outgoing = outgoing.lock().unwrap();
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }
    })
    .await;
Jews answered 30/12, 2023 at 0:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.