我正在尝试构建一个服务,从SFTP服务器中拉取文件并上传到S3。
对于SFTP部分,我使用async-ssh2,它为我提供了一个实现
我的计划是在基于此处的代码的基础上,在
对于SFTP部分,我使用async-ssh2,它为我提供了一个实现
futures::AsyncRead
的文件处理器。由于这些SFTP文件可能非常大,因此我正在尝试将此File
处理器转换为ByteStream
以便使用Rusoto进行上传。看起来可以使用futures::Stream
初始化ByteStream
。我的计划是在基于此处的代码的基础上,在
File
对象上实现Stream
,以便与Rusoto兼容(下面是为了记录而复制的代码)。use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
pub struct ByteStream<R>(R);
impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
type Item = u8;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = [0; 1];
match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
Ok(n) if n != 0 => Some(buf[0]).into(),
_ => None.into(),
}
}
}
这是一个好的方法吗?我看到了这个问题,但它似乎正在使用tokio::io::AsyncRead
。使用tokio
是处理此操作的规范方式吗?如果是,是否有一种方法可以将futures_io::AsyncRead
转换为tokio::io::AsyncRead
?
poll_next()
都会分配并清零 8MB 的缓冲区。最好避免这种开销,例如通过将buf
变成共享的线程本地缓冲区。另外,8MB 可能比你实际需要的要大得多。 - John KugelmanBytes :: from(buf:Vec <u8>)
需要拥有Vec
的所有权,而我们在从RefCell借用时没有它(因此需要克隆)。如果我们将buf设置为[u8]
,则在Bytes中唯一看到的方法是copy_from_slice
,它再次进行了复制。 - sonicxmlpoll_next
时,它们会在完成后返回对该缓冲区的引用。” - John Kugelman