如何在Rust中将Bytes Iterator转换为Stream

5

我正在尝试构建一个功能,需要将文件内容读入到futures::stream::BoxStream中,但我很难弄清楚我需要做什么。

我已经通过实现迭代器的 Bytes 方法解决了逐字节读取文件的问题。

use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, Bytes};

// TODO: Convert this to a async Stream
fn async_read() -> Box<dyn Iterator<Item = Result<u8, std::io::Error>>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);

    let iter = reader.bytes().into_iter();
    Box::new(iter)
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    for b in async_read().into_iter() {
        println!("{:?}", b);
    }
}

然而,我一直在尝试将这个 Box<dyn Iterator<Item = Result<u8, std::io::Error>>> 转换成一个 Stream

我本以为像这样就可以:

use futures::stream;
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, Bytes};

// TODO: Convert this to a async Stream
fn async_read() -> stream::BoxStream<'static, dyn Iterator<Item = Result<u8, std::io::Error>>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);

    let iter = reader.bytes().into_iter();
    std::pin::Pin::new(Box::new(stream::iter(iter)))
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    while let Some(b) = async_read().poll() {
        println!("{:?}", b);
    }
}

但我一直遇到很多编译器错误,我已经尝试过其他排列组合,但通常都无法解决问题。

其中一个编译器错误:

std::pin::Pin::new
``` --> src/main.rs:14:24
   |
14 |     std::pin::Pin::new(Box::new(stream::iter(iter)))
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected trait object `dyn std::iter::Iterator`, found enum `std::result::Result`

有人有什么建议吗?

我对 Rust 和 Streams/低级别的东西还比较陌生,如果我有任何错误,请随时指正。

额外说明一下,我正在尝试实现这个功能,以便您可以在 nushell 命令中使用 CTRL-C 退出

1个回答

6
我觉得你把它复杂化了一些,你可以直接从 async_read 返回 impl Stream,不需要使用 box 或 pin(原始的基于 Iterator 的版本也是这样)。然后你需要设置一个异步运行时来轮询流(在这个示例中,我只是使用 futures::executor::block_on 提供的运行时)。然后你可以调用流上的 futures::stream::StreamExt::next() 来获取表示下一个项的 future。
以下是一种方法:
use futures::prelude::*;
use std::{
    fs::File,
    io::{prelude::*, BufReader},
};

fn async_read() -> impl Stream<Item = Result<u8, std::io::Error>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);
    stream::iter(reader.bytes())
}

async fn async_main() {
    while let Some(b) = async_read().next().await {
        println!("{:?}", b);
    }
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    futures::executor::block_on(async_main());
}

3
你是一位美丽的人,我爱你。 - Arash Outadi
@ArashOutadi 很高兴我能帮到你! :) - Coder-256

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接