How do I schedule a repeating task in Tokio?
Asked Answered
C

2

10

I am replacing synchronous socket code written in Rust with the asynchronous equivalent using Tokio. Tokio uses futures for asynchronous activity so tasks are chained together and queued onto an executor to be executed by a thread pool.

The basic pseudocode for what I want to do is like this:

let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let in_buf = vec![0u8; 8192];
    // TODO this should happen continuously until an error happens
    let read_task = tokio::io::read(socket, in_buf).and_then(move |(socket, in_buf, bytes_read)| {
        /* ... Logic I want to happen repeatedly as bytes are read ... */
        Ok(())
    };
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);

This pseudocode would only execute my task once. How do I run it continuously? I want it to execute and then execute again and again etc. I only want it to stop executing if it panics or has an error result code. What's the simplest way of doing that?

Cholla answered 13/3, 2018 at 11:11 Comment(0)
H
1

Using loop_fn should work:

let read_task =
    futures::future::loop_fn((socket, in_buf, 0), |(socket, in_buf, bytes_read)| {
        if bytes_read > 0 { /* handle bytes */ }

        tokio::io::read(socket, in_buf).map(Loop::Continue)
    });
Hemianopsia answered 13/3, 2018 at 23:2 Comment(1)
loop_fn doesn't seem to exist in futures 0.3 anymore, unfortunatelyFortyfour
C
0

A clean way to accomplish this and not have to fight the type system is to use tokio-codec crate; if you want to interact with the reader as a stream of bytes instead of defining a codec you can use tokio_codec::BytesCodec.

use tokio::codec::Decoder;
use futures::Stream;
...

let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let (_writer, reader) = tokio_codec::BytesCodec::new().framed(socket).split();
    let read_task = reader.for_each(|bytes| {
            /* ... Logic I want to happen repeatedly as bytes are read ... */
    });
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);
Cathode answered 8/4, 2019 at 23:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.