在我的应用程序中,我有一个阻塞任务,同步地从队列中读取消息并将其提供给正在运行的任务。所有这些都正常工作,但我遇到的问题是进程无法正确终止,因为
我根据tokio文档构建了一个小示例:https://docs.rs/tokio/1.20.1/tokio/task/fn.spawn_blocking.html
queue_reader
任务没有停止。我根据tokio文档构建了一个小示例:https://docs.rs/tokio/1.20.1/tokio/task/fn.spawn_blocking.html
use tokio::sync::mpsc;
use tokio::task;
#[tokio::main]
async fn main() {
let (incoming_tx, mut incoming_rx) = mpsc::channel(2);
// Some blocking task that never ends
let queue_reader = task::spawn_blocking(move || {
loop {
// Stand in for receiving messages from queue
incoming_tx.blocking_send(5).unwrap();
}
});
let mut acc = 0;
// Some complex condition that determines whether the job is done
while acc < 95 {
tokio::select! {
Some(v) = incoming_rx.recv() => {
acc += v;
}
}
}
assert_eq!(acc, 95);
println!("Finalizing thread");
queue_reader.abort(); // This doesn't seem to terminate the queue_reader task
queue_reader.await.unwrap(); // <-- The process hangs on this task.
println!("Done");
}
一开始我以为queue_reader.abort()
可以终止任务,但事实并非如此。我的期望是,只有使用内部.await
的任务才能由tokio
控制并终止。这样做是正确的吗?
为了终止queue_reader
任务,我引入了一个oneshot
通道,在其中发出终止信号,如下代码段所示。
use tokio::task;
use tokio::sync::{oneshot, mpsc};
#[tokio::main]
async fn main() {
let (incoming_tx, mut incoming_rx) = mpsc::channel(2);
// A new channel to communicate when the process must finish.
let (term_tx, mut term_rx) = oneshot::channel();
// Some blocking task that never ends
let queue_reader = task::spawn_blocking(move || {
// As long as termination is not signalled
while term_rx.try_recv().is_err() {
// Stand in for receiving messages from queue
incoming_tx.blocking_send(5).unwrap();
}
});
let mut acc = 0;
// Some complex condition that determines whether the job is done
while acc < 95 {
tokio::select! {
Some(v) = incoming_rx.recv() => {
acc += v;
}
}
}
assert_eq!(acc, 95);
// Signal termination
term_tx.send(()).unwrap();
println!("Finalizing thread");
queue_reader.await.unwrap();
println!("Done");
}
我的问题是,这是规范/最佳方式吗?还是有更好的选择?
CancellationToken
是一个很好的建议,因为在同步块内,我可以使用同步函数is_cancelled()
来检查终止。 - Peterpaul Klein Haneveld