I want to create a simple websocket server. I want to process the incoming messages and send a response, but I get an error:
error: captured variable cannot escape `FnMut` closure body
--> src\main.rs:32:27
|
32 | incoming.for_each(|m| async {
| _________________________-_^
| | |
| | inferred to be a `FnMut` closure
33 | | match m {
34 | | // Error here...
35 | | Ok(message) => do_something(message, db, &mut outgoing).await,
36 | | Err(e) => panic!(e)
37 | | }
38 | | }).await;
| |_____^ returns a reference to a captured variable which escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
This gives a few hits on Stack Overflow but I don't see anywhere in my code where a variable is escaping. The async block won't run concurrently, so I don't see any problem. Furthermore, I feel like I am doing something very simple: I get a type which allows me to send data back to the client, but when using a reference to it in the async block, it gives a compile error. The error only occurs when I use the outgoing
or db
variable in the async code.
This is my code (error is in the handle_connection
function):
main.rs
use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
struct DatabaseConnection;
#[tokio::main]
async fn main() -> Result<(), ()> {
listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}
async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
let try_socket = TcpListener::bind(address).await;
let mut listener = try_socket.expect("Failed to bind on address");
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr, db.clone()));
}
Ok(())
}
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
let db = &*db;
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();
let (mut outgoing, incoming) = ws_stream.split();
// Adding 'move' does also not work
incoming.for_each(|m| async {
match m {
// Error here...
Ok(message) => do_something(message, db, &mut outgoing).await,
Err(e) => panic!(e)
}
}).await;
}
async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
// Do something...
// Send some message
let _ = outgoing.send(Message::Text("yay".to_string())).await;
}
Cargo.toml
[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"
When using async move
, I get the following error:
code
incoming.for_each(|m| async move {
let x = &mut outgoing;
let b = db;
}).await;
error
error[E0507]: cannot move out of `outgoing`, a captured variable in an `FnMut` closure
--> src\main.rs:33:38
|
31 | let (mut outgoing, incoming) = ws_stream.split();
| ------------ captured outer variable
32 |
33 | incoming.for_each(|m| async move {
| ______________________________________^
34 | | let x = &mut outgoing;
| | --------
| | |
| | move occurs because `outgoing` has type `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::tcp::stream::TcpStream>, tungstenite::protocol::message::Message>`, which does not implement the `Copy` trait
| | move occurs due to use in generator
35 | | let b = db;
36 | | }).await;
| |_____^ move out of `outgoing` occurs here
Ok(message) => do_something(message, db, &mut outgoing).await,
– Latinistmove
(... move |m| async ...
) – Imancargo run
– Latinistdo_something(...).await
wheredo_something
is passed references... – Imanmove
keyword, you also movelet db = &*db;
to inside the closure? – Imanincoming.for_each(move |m| async { let x = db;let y = &mut outgoing;}).await;
– Latinistasync
toasync move
. – Iman|m| async move
and returning something other thando_something(...).await
(e.g.panic!(...)
), and make sure that compiles. play.rust-lang.org/… – Imanfold
insteadfor_each
to keep outgoing in hand, this will fix the compile errors but i don't know if there is any logical error. I've explained this problem in the link that i've shared at comment above. – Coddleincoming.for_each(|m| async move {let x = &mut outgoing;let b = db;}).await;
, I get a more specific error, which I added in the question – Latinistincoming.fold(0, |i, m| async move {let x = &mut outgoing;let b = db;i}).await;
– Latinistfold
solution in this link: play.rust-lang.org/… (which i've shared before) – Coddle