如何在Tokio中安排重复任务?

9
我正在用Tokio将使用Rust编写的同步套接字代码替换为异步等效代码。Tokio使用 futures 进行异步活动,因此任务被链接在一起并排队到执行线程池中执行。
我想要做的基本伪代码如下:
let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let in_buf = vec![0u8; 8192];
    // TODO this should happen continuously until an error happens
    let read_task = tokio::io::read(socket, in_buf).and_then(move |(socket, in_buf, bytes_read)| {
        /* ... Logic I want to happen repeatedly as bytes are read ... */
        Ok(())
    };
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);

这个伪代码只会执行一次我的任务。我该如何让它持续运行?我希望它能够执行一次又一次,直到出现恐慌或错误结果代码才停止执行。最简单的方法是什么?

2个回答

1
使用loop_fn应该可以工作:
let read_task =
    futures::future::loop_fn((socket, in_buf, 0), |(socket, in_buf, bytes_read)| {
        if bytes_read > 0 { /* handle bytes */ }

        tokio::io::read(socket, in_buf).map(Loop::Continue)
    });

4
很不幸,在futures 0.3中似乎已经不存在loop_fn,需要注意。 - edrevo

0
一种不需要与类型系统斗争的清晰方法是使用tokio-codec crate;如果你想将读取器作为字节流进行交互而不是定义编解码器,你可以使用tokio_codec::BytesCodec
use tokio::codec::Decoder;
use futures::Stream;
...

let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let (_writer, reader) = tokio_codec::BytesCodec::new().framed(socket).split();
    let read_task = reader.for_each(|bytes| {
            /* ... Logic I want to happen repeatedly as bytes are read ... */
    });
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接