Await for future again after tokio::time::timeout
Asked Answered
C

2

5

Background:
I have a process using tokio::process to spawn child processes with handles in the tokio runtime.

It is also responsible for freeing the resources after killing a child and, according to the documentation (std::process::Child, tokio::process::Child), this requires the parent to wait() (or await in tokio) for the process.

Not all process behave the same to a SIGINT or a SIGTERM, so I wanted to give the child some time to die, before I send a SIGKILL.

Desired solution:

    pub async fn kill(self) {
        // Close input
        std::mem::drop(self.stdin);

        // Send gracefull signal
        let pid = nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t);
        nix::sys::signal::kill(pid, nix::sys::signal::SIGINT);

        // Give the process time to die gracefully
        if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
        {
            // Kill forcefully
            nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL);
            self.process.await;
        }
    }

However this error is given:

error[E0382]: use of moved value: `self.process`
  --> src/bin/multi/process.rs:46:13
   |
42 |         if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
   |                                                                                 ------------ value moved here
...
46 |             self.process.await;
   |             ^^^^^^^^^^^^ value used here after move
   |
   = note: move occurs because `self.process` has type `tokio::process::Child`, which does not implement the `Copy` trait

And if I obey and remove the self.process.await, I see the child process still taking resources in ps.

Question:
How can I await for an amount of time and perform actions and await again if the amount of time expired?

Note:
I solved my immediate problem by setting a tokio timer that always sends the SIGKILL after two seconds, and having a single self.process.await at the bottom. But this solution is not desirable since another process may spawn in the same PID while the timer is running.

Edit:
Adding a minimal, reproducible example (playground)

async fn delay() {
    for _ in 0..6 {
        tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
        println!("Ping!");
    }
}

async fn runner() {
    let delayer = delay();
    if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), delayer).await {
        println!("Taking more than two seconds");
        delayer.await;
    }
}
Clevelandclevenger answered 5/4, 2020 at 11:29 Comment(0)
P
8

You needed to pass a mutable reference. However, you first need to pin the future in order for its mutable reference to implement Future. pin_mut re-exported from the futures crate is a good helper around this:

use futures::pin_mut;

async fn delay() {
    for _ in 0..6 {
        tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
        println!("Ping!");
    }
}

async fn runner() {
    let delayer = delay();
    pin_mut!(delayer);
    if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), &mut delayer).await {
        println!("Taking more than two seconds");
        delayer.await;
    }
}
Pericline answered 5/4, 2020 at 13:50 Comment(2)
This is right to the point! :) Also, I was able to avoid bringing the whole crate in, since pin_mut is a fairly simple macro. I added the links to your answer so it is easier for others to find the crate (and the implementation of the macro)Clevelandclevenger
Thanks, this helped me, but could you explain why this works? The documentation for timeout says "If the future completes before the duration has elapsed, then the completed value is returned. Otherwise, an error is returned and the future is canceled". So why isn't it cancelled if it's pin_mut'ed?Rokach
M
1

A generic wrapper could be reused for these cases:

use std::time::Duration;
use futures::pin_mut;
use tokio::time::timeout;

pub async fn on_slow<T, S: FnOnce()>(
    future: impl Future<Output = T>,
    duration: Duration,
    fn_on_slow: S,
) -> T {
    pin_mut!(future);
    if let Ok(result) = timeout(duration, &mut future).await {
        result
    } else {
        fn_on_slow();
        future.await
    }
}

// usage
async fn runner() {
  on_slow(
    delay(), // some long running future WITHOUT .await
    Duration::from_secs(2),
    || println!("Taking more than two seconds")
  ).await
}
Magnetochemistry answered 8/10, 2023 at 19:50 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.