如何终止一个阻塞的tokio任务?

3
在我的应用程序中,我有一个阻塞任务,同步地从队列中读取消息并将其提供给正在运行的任务。所有这些都正常工作,但我遇到的问题是进程无法正确终止,因为queue_reader任务没有停止。
我根据tokio文档构建了一个小示例:https://docs.rs/tokio/1.20.1/tokio/task/fn.spawn_blocking.html
use tokio::sync::mpsc;
use tokio::task;

#[tokio::main]
async fn main() {
    let (incoming_tx, mut incoming_rx) = mpsc::channel(2);
    // Some blocking task that never ends
    let queue_reader = task::spawn_blocking(move || {
        loop {
            // Stand in for receiving messages from queue
            incoming_tx.blocking_send(5).unwrap();
        }
    });

    let mut acc = 0;
    // Some complex condition that determines whether the job is done
    while acc < 95 {
        tokio::select! {
            Some(v) = incoming_rx.recv() => {
                acc += v;
            }
        }
    }
    assert_eq!(acc, 95);
    println!("Finalizing thread");
    queue_reader.abort(); // This doesn't seem to terminate the queue_reader task
    queue_reader.await.unwrap(); // <-- The process hangs on this task.
    println!("Done");
}

一开始我以为queue_reader.abort()可以终止任务,但事实并非如此。我的期望是,只有使用内部.await的任务才能由tokio控制并终止。这样做是正确的吗?

为了终止queue_reader任务,我引入了一个oneshot通道,在其中发出终止信号,如下代码段所示。

use tokio::task;
use tokio::sync::{oneshot, mpsc};

#[tokio::main]
async fn main() {
    let (incoming_tx, mut incoming_rx) = mpsc::channel(2);
    // A new channel to communicate when the process must finish.
    let (term_tx, mut term_rx) = oneshot::channel();
    // Some blocking task that never ends
    let queue_reader = task::spawn_blocking(move || {
        // As long as termination is not signalled
        while term_rx.try_recv().is_err() {
            // Stand in for receiving messages from queue
            incoming_tx.blocking_send(5).unwrap();
        }
    });

    let mut acc = 0;
    // Some complex condition that determines whether the job is done
    while acc < 95 {
        tokio::select! {
            Some(v) = incoming_rx.recv() => {
                acc += v;
            }
        }
    }
    assert_eq!(acc, 95);
    // Signal termination
    term_tx.send(()).unwrap();
    println!("Finalizing thread");
    queue_reader.await.unwrap();
    println!("Done");
}

我的问题是,这是规范/最佳方式吗?还是有更好的选择?


请查看 https://tokio.rs/tokio/topics/shutdown。 - Chayim Friedman
1
@ÖmerErden CancellationToken 是一个很好的建议,因为在同步块内,我可以使用同步函数 is_cancelled() 来检查终止。 - Peterpaul Klein Haneveld
我从未想过它可能有一个同步方法。我会在我的答案中添加这个。 - Peter Hall
1个回答

2

Tokio无法终止CPU-bound/阻塞任务。

从技术上讲,杀死操作系统线程是可行的,但通常不是一个好主意,因为创建新线程是昂贵的,并且可能会使您的程序处于无效状态。即使Tokio认为这是值得实现的功能,它也将严重限制其实现-它将被迫进入多线程模型,仅支持在阻塞任务完成之前终止该任务的可能性。

您的解决方案非常好; 将终止自己的责任交给您的阻塞任务,并提供一种告诉它这样做的方法。如果此功能是库的一部分,则可以通过返回具有cancel()方法的任务的“句柄”来抽象机制。

是否有更好的替代方案?也许,但这取决于其他因素。您的解决方案很好且易于扩展,例如,如果以后需要向任务发送不同类型的信号。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接