How do I gracefully shutdown the Tokio runtime in response to a SIGTERM?
Asked Answered
T

2

21

I have a main function, where I create a Tokio runtime and run two futures on it.

use tokio;

fn main() {
    let mut runtime = tokio::runtime::Runtime::new().unwrap();

    runtime.spawn(MyMegaFutureNumberOne {});
    runtime.spawn(MyMegaFutureNumberTwo {});

    // Some code to 'join' them after receiving an OS signal
}

How do I receive a SIGTERM, wait for all unfinished tasks (NotReadys) and exit the application?

Tailwind answered 24/11, 2018 at 13:43 Comment(3)
Not familiar enough with Tokio or signal handling to answer, but there's a tokio_signal library in the main Tokio repo - perhaps that's one piece of the puzzle?Hitch
Based on your question's construction, an answer that shows how to do this without involving SIGTERM would not be accepted, correct?Turtledove
Afaik, Linux server applications are usually stopped with an OS signal like SIGTERM (if you use systemctl or service) or SIGINT (from the terminal if it is running in one), so it is an idiomatic way. Therefore, signal handling is strongly preferred. Maybe it has something to do with the Runtime's shutdown_on_idle method, but I have no idea how to call it after handling a signalTailwind
F
21

Dealing with signals is tricky and it would be too broad to explain how to handle all possible cases. The implementation of signals is not standard across platforms, so my answer is specific to Linux. If you want to be more cross-platform, use the POSIX function sigaction combined with pause; this will offer you more control.

The documentation of tokio have a great getting started guide for signal in tokio. Thus, I will try to add my own advice.

My general advice is to have a task that will handle the signal for us, then you use a watch channel in your other tasks that will stop if the watch channel status changed.

My second advice is too use biased with the select that wait for your futures, this is important cause you generally want to know if a signal have been received immediately and not do other thing before. This could be a problem with a busy loop that is very often ready, you would never get your signal future branch. Please read carefully the documentation about biased.

use core::time::Duration;

use tokio::{
    select,
    signal::unix::{signal, SignalKind},
    sync::watch,
    time::sleep,
};

#[tokio::main]
async fn main() {
    let (stop_tx, mut stop_rx) = watch::channel(());

    tokio::spawn(async move {
        let mut sigterm = signal(SignalKind::terminate()).unwrap();
        let mut sigint = signal(SignalKind::interrupt()).unwrap();
        loop {
            select! {
                _ = sigterm.recv() => println!("Recieve SIGTERM"),
                _ = sigint.recv() => println!("Recieve SIGTERM"),
            };
            stop_tx.send(()).unwrap();
        }
    });

    loop {
        select! {
            biased;

            _ = stop_rx.changed() => break,
            i = some_operation(42) => {
                println!("Result is {i}");
                unsafe { libc::raise(libc::SIGTERM)};
            },
        }
    }
}

async fn some_operation(i: u64) -> u64 {
    println!("Task started.");
    sleep(Duration::from_millis(i)).await;
    println!("Task shutting down.");
    i
}

You can clone the receiver of the channel as needed, this will make efficient to handle the signal.


Tokio 0.1

One way to achieve what you want is to use the tokio_signal crate to catch signals, like this: (doc example)

extern crate futures;
extern crate tokio;
extern crate tokio_signal;

use futures::prelude::*;
use futures::Stream;
use std::time::{Duration, Instant};
use tokio_signal::unix::{Signal, SIGINT, SIGTERM};

fn main() -> Result<(), Box<::std::error::Error>> {
    let mut runtime = tokio::runtime::Runtime::new()?;

    let sigint = Signal::new(SIGINT).flatten_stream();
    let sigterm = Signal::new(SIGTERM).flatten_stream();

    let stream = sigint.select(sigterm);

    let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5))
        .map(|()| println!("5 seconds are over"))
        .map_err(|e| eprintln!("Failed to wait: {}", e));

    runtime.spawn(deadline);

    let (item, _rest) = runtime
        .block_on_all(stream.into_future())
        .map_err(|_| "failed to wait for signals")?;

    let item = item.ok_or("received no signal")?;
    if item == SIGINT {
        println!("received SIGINT");
    } else {
        assert_eq!(item, SIGTERM);
        println!("received SIGTERM");
    }

    Ok(())
}

This program will wait for all current tasks to complete and will catch the selected signals. This doesn't seem to work on Windows as it instantly shuts down the program.

Fibriform answered 27/11, 2018 at 18:14 Comment(0)
R
13

For Tokio version 1.x.y, the official Tokio tutorial has a page on this topic: Graceful shutdown

Ricercar answered 1/9, 2021 at 7:6 Comment(1)
Thanks for the answer! At least when reading this today, the link suggested: signal::ctrl_c() Looking at the code, it seems to react to SIGINT on Unix system instead of SIGTERM like in the original question. This might be important on Kubernetes or similar. Waiting for SIGTERM would be something like: tokio::signal::unix::signal(SignalKind::terminate()).unwrap().recv().awaitTarrasa

© 2022 - 2024 — McMap. All rights reserved.