I am trying to make a multithreaded tcp communication program in rust
the idea is that there exists a listening socket on the main thread, and as connections come in the work is handled by worker threads
I previously used a ThreadPool approach I found in the Rust book, but as I understand tokio is able to 'automatically' assign work to threads from a pool
I am confused with what is the difference between OS threads and tokio tasks (mostly because you use spawn
to create both)
here is some code
fn main() {
println!("Hello World!");
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 103, 7)), 2048);
println!("socket -> {}", socket);
// You need to use the tokio runtime to execute async functions
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let listener = StartListen::new(socket).await.unwrap();
});
}
and i have a StartListen
defined in another file
// Defines the StartListen class
pub struct StartListen {
listener: TcpListener,
}
// Implementation for StartListen class
impl StartListen {
pub async fn new(socket: SocketAddr) -> Result<StartListen, StartListenError>{
println!("Attempting to listen");
let bind_result = TcpListener::bind(socket).await;
sleep(Duration::from_secs(5)).await;
match bind_result {
Ok(listener) => {
println!("Server is listening on {}", &socket);
Ok(StartListen { listener })
}
Err(err) => Err(StartListenError::BindError(err)),
}
}
}
to add some more context, the idea is there are 2 types of messages that this socket is expecting
// Defines the types of messages to expect
pub enum RequestType {
RequestWork {
request_message: String,
parameter: String,
sender_timestamp: String,
},
CloseConnection {
initial_timestamp: String,
final_timestamp: String,
},
Invalid(String),
}
I am yet to add a handle_connection
method, would I have to define the handle_connection to sit in a loop and spawn tasks?
pub async fn accept_connections(&self) {
loop {
let (mut stream, addr) = self.listener.accept().await.unwrap();
println!("New connection from {}", addr);
// Spawn a new task to handle the connection
tokio::spawn(async move {
let mut buffer = [0; 1024];
loop {
let n = match stream.read(&mut buffer).await {
Ok(n) if n == 0 => return, // Connection closed
Ok(n) => n,
Err(e) => {
eprintln!("Error reading from socket: {}", e);
return;
}
};
// Convert the received message into a RequestType
let message = String::from_utf8_lossy(&buffer[..n]);
spawn_blocking
to run the computation on a dedicated thread, and bring the result back into the Tokio context when it's completed. This leaves the Tokio worker threads free to handle other, shorter tasks, or I/O. – Canonicitytokio
for async LAN connections and for the runtime (#[tokio::main]
). Then use an async database crate for the database connection, andspawn_blocking
for the computational work. – Cloutman