如何使用Hyper在下载大文件时进行断点续传和错误恢复?

6
我想使用Hyper下载大文件(500MB),如果下载失败还能恢复下载。
是否可以使用Hyper在每个接收到的数据块上运行一些函数?send()方法返回一个Result<Response>,但我找不到Response上返回块迭代器的任何方法。理想情况下,我希望能够执行以下操作:
client.get(&url.to_string())
    .send()
    .map(|mut res| {
        let mut chunk = String::new();
        // write this chunk to disk
    });

这是否可能,还是map只会在hyper下载整个文件后被调用?

read 本身就是分块的,不是吗?你可以一次读取 N 字节。我不确定这是否对应于确切下载了 N 字节,还是只是缓冲读取。但只要你正确保存数据,这就不太重要了。 - Kroltan
在 Rust Cookbook 中有一个示例(https://rust-lang-nursery.github.io/rust-cookbook/web/clients/download.html#make-a-partial-download-with-http-range-headers)。它使用的是 reqwest,但对于 hyper 来说概念是相同的。 - peter
1个回答

8

有没有办法在Hyper中为每个接收到的数据块运行一些函数?

Hyper的Response实现了Read。这意味着Response是一个流,您可以像通常使用流一样读取任意大小的数据块。

就我而言,这是我从ICECat下载大文件时使用的代码片段。我正在使用Read接口来在终端上显示下载进度。

这里的变量response是Hyper的Response的一个实例。

{
    let mut file = try_s!(fs::File::create(&tmp_path));
    let mut deflate = try_s!(GzDecoder::new(response));

    let mut buf = [0; 128 * 1024];
    let mut written = 0;
    loop {
        status_line! ("icecat_fetch] " (url) ": " (written / 1024 / 1024) " MiB.");
        let len = match deflate.read(&mut buf) {
            Ok(0) => break,  // EOF.
            Ok(len) => len,
            Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
            Err(err) => return ERR!("{}: Download failed: {}", url, err),
        };
        try_s!(file.write_all(&buf[..len]));
        written += len;
    }
}

try_s!(fs::rename(tmp_path, target_path));
status_line_clear();

我想使用Hyper下载大文件(500mb),并且在下载失败时能够恢复下载。通常情况下,这是通过HTTP“Range”标头实现的(参见RFC 7233)。并不是每个服务器都支持“Range”标头。我见过很多带有自定义HTTP堆栈并且没有适当的“Range”支持或由于某些原因禁用“Range”标头的服务器。因此,跳过Hyper的响应块可能是必要的后备方案。但是,如果您想加快下载速度并节省流量,则应使用“Range”标头作为恢复停止下载的主要方法。

顺便提一句,使用Hyper 0.12时,Hyper返回的响应体是一个Stream,如果想对接收到的每个数据块运行某些函数,可以使用for_each流组合器:

extern crate futures;
extern crate futures_cpupool;
extern crate hyper; // 0.12
extern crate hyper_rustls;

use futures::Future;
use futures_cpupool::CpuPool;
use hyper::rt::Stream;
use hyper::{Body, Client, Request};
use hyper_rustls::HttpsConnector;
use std::thread;
use std::time::Duration;

fn main() {
    let url = "https://steemitimages.com/DQmYWcEumaw1ajSge5PcGpgPpXydTkTcqe1daF4Ro3sRLDi/IMG_20130103_103123.jpg";

    // In real life we'd want an asynchronous reactor, such as the tokio_core, but for a short example the `CpuPool` should do.
    let pool = CpuPool::new(1);
    let https = HttpsConnector::new(1);
    let client = Client::builder().executor(pool.clone()).build(https);

    // `unwrap` is used because there are different ways (and/or libraries) to handle the errors and you should pick one yourself.
    // Also to keep this example simple.
    let req = Request::builder().uri(url).body(Body::empty()).unwrap();
    let fut = client.request(req);

    // Rebinding (shadowing) the `fut` variable allows us (in smart IDEs) to more easily examine the gradual weaving of the types.
    let fut = fut.then(move |res| {
        let res = res.unwrap();
        println!("Status: {:?}.", res.status());
        let body = res.into_body();
        // `for_each` returns a `Future` that we must embed into our chain of futures in order to execute it.
        body.for_each(move |chunk| {println!("Got a chunk of {} bytes.", chunk.len()); Ok(())})
    });

    // Handle the errors: we need error-free futures for `spawn`.
    let fut = fut.then(move |r| -> Result<(), ()> {r.unwrap(); Ok(())});

    // Spawning the future onto a runtime starts executing it in background.
    // If not spawned onto a runtime the future will be executed in `wait`.
    // 
    // Note that we should keep the future around.
    // To save resources most implementations would *cancel* the dropped futures.
    let _fut = pool.spawn(fut);

    thread::sleep (Duration::from_secs (1));  // or `_fut.wait()`.
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接