如何优雅地响应SIGTERM信号关闭Tokio运行时?

18

我有一个main函数,在其中创建了一个Tokio运行时,并在其上运行了两个Future。

use tokio;

fn main() {
    let mut runtime = tokio::runtime::Runtime::new().unwrap();

    runtime.spawn(MyMegaFutureNumberOne {});
    runtime.spawn(MyMegaFutureNumberTwo {});

    // Some code to 'join' them after receiving an OS signal
}

我该如何接收 SIGTERM,等待所有未完成的任务(NotReady)并退出应用程序?


对于Tokio或信号处理不太熟悉,但是主要的Tokio存储库中有一个tokio_signal库 - 也许这是谜题的一部分? - Joe Clay
根据您的问题构造,一个不涉及SIGTERM的解决方案将不被接受,对吗? - Shepmaster
据我所知,Linux服务器应用程序通常使用类似于SIGTERM(如果您使用systemctl或service)或SIGINT(如果在终端中运行)的操作系统信号停止,因此这是一种惯用方式。因此,强烈建议处理信号。也许这与Runtime的shutdown_on_idle方法有关,但是我不知道如何在处理信号后调用它。 - hedgar2017
2个回答

19

处理信号是棘手的,解释如何处理所有可能情况会太广泛。信号的实现在不同平台上并不标准,因此我的答案仅适用于Linux。如果您想要更跨平台的解决方案,请使用POSIX函数sigaction结合pause;这将为您提供更多控制。

tokio的文档有一个很好的入门指南,介绍了如何在tokio中处理信号。因此,我会尝试添加自己的建议。

我的一般建议是有一个任务来处理信号,然后在其他任务中使用监视通道,如果监视通道状态发生变化,则停止该任务。

我的第二个建议是在等待您的 futures 的 select 中使用 biased,这很重要,因为通常您希望立即知道是否已收到信号,而不是在执行其他操作之前。这可能会导致繁忙循环出现问题,因为它经常准备好,您永远无法获得信号 future 分支。请仔细阅读有关 biased文档

use core::time::Duration;

use tokio::{
    select,
    signal::unix::{signal, SignalKind},
    sync::watch,
    time::sleep,
};

#[tokio::main]
async fn main() {
    let (stop_tx, mut stop_rx) = watch::channel(());

    tokio::spawn(async move {
        let mut sigterm = signal(SignalKind::terminate()).unwrap();
        let mut sigint = signal(SignalKind::interrupt()).unwrap();
        loop {
            select! {
                _ = sigterm.recv() => println!("Recieve SIGTERM"),
                _ = sigint.recv() => println!("Recieve SIGTERM"),
            };
            stop_tx.send(()).unwrap();
        }
    });

    loop {
        select! {
            biased;

            _ = stop_rx.changed() => break,
            i = some_operation(42) => {
                println!("Result is {i}");
                unsafe { libc::raise(libc::SIGTERM)};
            },
        }
    }
}

async fn some_operation(i: u64) -> u64 {
    println!("Task started.");
    sleep(Duration::from_millis(i)).await;
    println!("Task shutting down.");
    i
}

您可以根据需要克隆通道的接收器,这将使处理信号更加高效。

Tokio 0.1

实现您想要的一种方法是使用tokio_signal crate来捕获信号,像这样:(doc example)

extern crate futures;
extern crate tokio;
extern crate tokio_signal;

use futures::prelude::*;
use futures::Stream;
use std::time::{Duration, Instant};
use tokio_signal::unix::{Signal, SIGINT, SIGTERM};

fn main() -> Result<(), Box<::std::error::Error>> {
    let mut runtime = tokio::runtime::Runtime::new()?;

    let sigint = Signal::new(SIGINT).flatten_stream();
    let sigterm = Signal::new(SIGTERM).flatten_stream();

    let stream = sigint.select(sigterm);

    let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5))
        .map(|()| println!("5 seconds are over"))
        .map_err(|e| eprintln!("Failed to wait: {}", e));

    runtime.spawn(deadline);

    let (item, _rest) = runtime
        .block_on_all(stream.into_future())
        .map_err(|_| "failed to wait for signals")?;

    let item = item.ok_or("received no signal")?;
    if item == SIGINT {
        println!("received SIGINT");
    } else {
        assert_eq!(item, SIGTERM);
        println!("received SIGTERM");
    }

    Ok(())
}

该程序将等待所有当前任务完成并捕获所选信号。但在Windows上似乎无法正常工作,因为它会立即关闭程序。


10

对于 Tokio 版本 1.x.y,官方的 Tokio 教程有一个关于这个主题的页面:优雅关闭


1
感谢您的回答!至少在今天阅读时,建议使用以下链接:signal::ctrl_c()查看代码,它似乎会对Unix系统上的SIGINT做出反应,而不是像原始问题中那样对SIGTERM做出反应。这在Kubernetes或类似系统中可能很重要。等待SIGTERM将类似于:tokio::signal::unix::signal(SignalKind::terminate()).unwrap().recv().await - Peter Kolloch

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接