Tokio选择宏条件和ARM评估

4
我有一段代码需要从多个TcpStream中读取。其中一个TcpStream将始终存在,但另一个可能存在也可能不存在。
我之前写过这段代码,并且(天真地)在tokio select宏上使用了条件。不幸的是,我很快就发现该宏将首先评估arm(以获取future),然后才会检查条件,并根据此条件启动或跳过future。
结果,当我运行此代码时,我将得到以下错误信息: thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', src/main.rs:16:35
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() {
    let mut stream1 : TcpStream = TcpStream::connect("info.cern.ch:80").await.unwrap();
    let mut stream2 : Option<TcpStream> = None;
    let mut buf = vec![0_u8; 1024];
    let mut buf2 = vec![0_u8; 1024];
    
    tokio::select! {
        result = stream1.read(&mut buf) => {
            // do something here
        }
        
        result = stream2.as_mut().unwrap().read(&mut buf2), if stream2.is_some() => {
            // do somethihng here
        }
    }
    
}

我遇到的问题是如何处理这个问题? 我看到了以下几个解决方法, 但都不尽人意: - 添加if语句。如果stream2.is_some(),则使用带有两个分支的select,否则使用只有一个标记的select。这种方法可以解决问题,但会导致代码重复,使代码难以看懂。 - 我尝试了FutureUnordered方案。然而,考虑到上面的代码将在循环中执行,所以我放弃了这种方式,因为它不喜欢我在循环中尝试借用可变的事物。 - 我考虑过使用“假”的TcpStream来消除Option<>部分。但是,这看起来也很丑陋。 我正在寻找一种方法,使我能够保留对Option的select!操作。

1
看起来这种行为已经改变了。旧版本的文档如下:“此外,每个分支可能包括一个可选的 if 前提条件。该前提条件在 <async expression> 之前进行评估。如果前提条件返回 false,则整个分支将被禁用。”但是当前版本如下:“此外,每个分支可能包括一个可选的 if 前提条件。如果前提条件返回 false,则该分支将被禁用。提供的 <async expression> 仍然会被评估,但是生成的 future 永远不会被轮询。” - cdhowie
这对我来说有点奇怪,我很好奇为什么他们要进行这个更改。我觉得旧版本的行为更加合理。 - cdhowie
@cdhowie 我从未使用过旧版本。但是,我同意旧行为更有道理的说法。 - Victor Ronin
1个回答

3

您可以将可选流的处理封装到另一个future中,可能返回自定义错误。然后在成功读取时进行模式匹配:

use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use std::io::{Error, ErrorKind};

#[tokio::main]
async fn main() {
    let mut stream1 : TcpStream = TcpStream::connect("info.cern.ch:80").await.unwrap();
    let mut stream2 : Option<TcpStream> = None;
    let mut buf = vec![0_u8; 1024];
    let mut buf2 = vec![0_u8; 1024];
    
    let read2 = async move {
        match stream2 {
            Some(mut stream) => stream.read(&mut buf2).await,
            _ => Err(Error::new(ErrorKind::Other, "No available stream")),
        }
    };
    
    tokio::select! {
        result = stream1.read(&mut buf) => {
            // do something here
        }
        
        Ok(result) = read2 => {
            // do somethihng here
        }
    }
    
}

Playground

如果您不喜欢自定义错误,可以将所有内容重新包装在Option<Result>中,并对所有内容进行模式匹配:

...
    let read2 = async move {
        match stream2 {
            Some(mut stream) => Some(stream.read(&mut buf2).await),
            None => None,
        }
    };
    
    tokio::select! {
        result = stream1.read(&mut buf) => {
            // do something here
        }
        
        Some(result) = read2 => {
            // do somethihng here
        }
    }
...

游乐场


1
谢谢@Netwave。这正是我在寻找的东西。我出去散步时也有类似的想法,回来看到你已经提供了完整的解决方案,真是太棒了。 - Victor Ronin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接