How can I use Tokio to trigger a function every period or interval in seconds?
Asked Answered
A

2

22

In Node.js I can set the interval that a certain event should be triggered,

function intervalFunc() {
  console.log('whelp, triggered again!');
}

setInterval(intervalFunc, 1500);

However the interface for Tokio's interval is a bit more complex. It seems to be a something to do with a much more literal definition of an interval, and rather than calling a function at an interval, it simply stalls the thread until the time passes (with .await).

Is there a primitive in Tokio that calls a function "every x seconds" or the like? If not, is there an idiom that has emerged to do this?

I only need to run one function on a recurring interval... I don't care about other threads either. It's just one function on Tokio's event loop.

Agape answered 30/3, 2021 at 0:40 Comment(0)
R
37

Spawn a Tokio task to do something forever:

use std::time::Duration;
use tokio::{task, time}; // 1.3.0

#[tokio::main]
async fn main() {
    let forever = task::spawn(async {
        let mut interval = time::interval(Duration::from_millis(10));

        loop {
            interval.tick().await;
            do_something().await;
        }
    });

    forever.await;
}

You can also use tokio::time::interval to create a value that you can tick repeatedly. Perform the tick and call your function inside of the body of stream::unfold to create a stream:

use futures::{stream, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0

#[tokio::main]
async fn main() {
    let interval = time::interval(Duration::from_millis(10));

    let forever = stream::unfold(interval, |mut interval| async {
        interval.tick().await;
        do_something().await;
        Some(((), interval))
    });

    let now = Instant::now();
    forever.for_each(|_| async {}).await;
}

async fn do_something() {
    eprintln!("do_something");
}

See also:

Romanov answered 30/3, 2021 at 1:12 Comment(8)
What does this even do: for_each(|_| async {}).await;Agape
@EvanCarroll docs.rs/futures/0.3.13/futures/stream/…Romanov
man that's confusing.. you can't do this with .repeat?Agape
repeat says: Create a stream which produces the same item repeatedly, which doesn't do what you want.Romanov
Well, it would seem like I could create a stream of function pointers which sleeps for the interval and runs them, or create a stream a of functions pointers and stream of intervals and combine them and await or something.Agape
Your second example does not run for ever. Can you modify this example to do that?Agape
@EvanCarroll forever.await instead of the timeout.Romanov
I think we could demonstrate these two things substantially clearer if we reduced both to looping infinitely rather than 10 times and such, and dropped the elapsed calculation which isn't really relative to what you're trying to show. Just some points (i upvoted and marked this as chosen though). I also asked a follow up, stackoverflow.com/q/66898839/124486Agape
P
4

I am still a rust/tokio beginner, but I did find this solution helpful for myself:

use std::time::Duration;
use tokio::time;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() {
    let mut stream = IntervalStream::new(time::interval(Duration::from_secs(1)));

    while let Some(_ts) = stream.next().await {
        println!("whelp, triggered again!");
    }
}

Please note that _ts holds the execution timestamp.

Penance answered 10/1, 2023 at 21:41 Comment(1)
albeit this is a less general solution, it is more suitable for most cases of time-repeated streams in our codebase.Tumbling

© 2022 - 2024 — McMap. All rights reserved.