如何将futures_io::AsyncRead转换为rusoto::ByteStream?

3
我正在尝试构建一个服务,从SFTP服务器中拉取文件并上传到S3。
对于SFTP部分,我使用async-ssh2,它为我提供了一个实现futures::AsyncRead的文件处理器。由于这些SFTP文件可能非常大,因此我正在尝试将此File处理器转换为ByteStream以便使用Rusoto进行上传。看起来可以使用futures::Stream初始化ByteStream
我的计划是在基于此处的代码的基础上,在File对象上实现Stream,以便与Rusoto兼容(下面是为了记录而复制的代码)。
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};

pub struct ByteStream<R>(R);

impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = u8;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
            Ok(n) if n != 0 => Some(buf[0]).into(),
            _ => None.into(),
        }
    }
}

这是一个好的方法吗?我看到了这个问题,但它似乎正在使用tokio::io::AsyncRead。使用tokio是处理此操作的规范方式吗?如果是,是否有一种方法可以将futures_io::AsyncRead转换为tokio::io::AsyncRead

1个回答

1
这是我进行转换的方式。我基于上面的代码,但使用了更大的缓冲区(8 KB)以减少网络调用次数。
use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;

const KB: usize = 1024;

struct ByteStream<R>(R);

impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = Result<Bytes, std::io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = vec![0_u8; 8 * KB];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
            Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
            Ok(_) => None.into(),
            Err(e) => Some(Err(e)).into(),
        }
    }
}

允许我这样做:
fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
    let stream = ByteStream(body);
    Some(StreamingBody::new(stream))
}

(请注意,rusoto::StreamingBodyrusoto::ByteStream 是别名)

2
请注意,每次调用 poll_next() 都会分配并清零 8MB 的缓冲区。最好避免这种开销,例如通过将 buf 变成共享的线程本地缓冲区。另外,8MB 可能比你实际需要的要大得多 - John Kugelman
哦,太酷了!我一直在尝试做那个,但不知道如何避免编译器错误而不使用unsafe块。还有谢谢你给的另一个链接,我会进行相应的更改。 - sonicxml
已更新为使用8 KB缓冲区大小;但是,我不确定如何在不复制的情况下使thread_local工作(这可能接近必须提出一个新问题)。Bytes :: from(buf:Vec <u8>)需要拥有Vec的所有权,而我们在从RefCell借用时没有它(因此需要克隆)。如果我们将buf设置为[u8],则在Bytes中唯一看到的方法是copy_from_slice,它再次进行了复制。 - sonicxml
1
转念一想,如果没有GATs,这可能是不可能的:“cramertj提出的主要问题是,与Iterator一样,[Stream]总是将每个项的所有权归还给其调用者...实际上,许多流/迭代器实现如果它们可以拥有一些内部存储器,它们会更有效率,反复重复使用。例如,它们可能具有内部缓冲区,当调用poll_next时,它们会在完成后返回对该缓冲区的引用。” - John Kugelman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接