如何使用reqwest执行并行异步HTTP GET请求?

60
异步示例很有用,但对于 Rust 和 Tokio 新手来说,我很难弄清楚如何同时进行 N 个请求,使用一个 URL 向量,并创建每个 URL 的响应 HTML 字符串的迭代器。如何实现呢?
2个回答

162

并发请求

截至reqwest 0.11.14版本:

use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter

将一个字符串集合转换成Stream

.map(|url| {

StreamExt :: map

在流中的每个元素上运行异步函数并将元素转换为新类型。

let client = &client;
async move {

明确引用 Client 并将该引用(而非原始的 Client)移入匿名异步块中。

let resp = client.get(url).send().await?;

使用Client的连接池启动异步GET请求并等待请求完成。

resp.bytes().await

请求并等待响应的字节。

.buffer_unordered(N);

StreamExt::buffer_unordered

将一个 future 流转换为该 future 值的流,同时并发执行这些 futures。

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

将流转换回单个future,打印出沿途接收到的数据量,然后等待future完成。

另请参阅:

没有有界执行

如果您希望的话,还可以将一个迭代器转换为一个 future 迭代器,并使用 future::join_all

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

我建议使用第一个示例,因为通常希望限制并发性,这是bufferbuffer_unordered所帮助的。

并行请求

并发请求通常已经足够好了,但有时您需要并行请求。在这种情况下,您需要启动一个任务。

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

主要区别如下:

  • 我们使用 tokio::spawn 在单独的任务中执行工作。
  • 我们必须为每个任务提供自己的 reqwest::Client。根据推荐, 我们克隆共享客户端以利用连接池。
  • 当任务无法加入时,还会出现其他错误情况。

另请参阅:


@Nawaz 请查看 Stream::buffer_unordered 的文档:如果此流的项目可以转换为未来对象。另请参阅Rust futures中thenand_thenor_else之间的区别是什么? - Shepmaster
1
根据我在这篇文章中发现的这个gist,我认为这个例子只会并发而不会并行。该文是从这篇文章中找到的。 - Alex Moore-Niemi
1
@AlexMoore-Niemi 好的,更新了。 - Shepmaster
1
@Shepmaster 我可能假设太多了,但看起来Client有一个内部Arc。这是否意味着克隆它将克隆Arc并允许您重用底层ClientRef的连接池(或某些巫术)? - Mark Lodato
1
@MarkLodato 是的,这就是文档推荐的做法:*客户端在内部持有一个连接池,因此建议您创建一个并重复使用它。您不必将Client包装在Rc或Arc中以重用它,因为它已经在内部使用了Arc。* - Shepmaster
显示剩余15条评论

-1
如果您的问题允许的话,我建议使用std async和rayon。它们现在都很成熟,并且非常容易上手,因为std中有async{/*代码在此*/}作用域边界。您还可以通过特性集成进入/并行使用tokio https://docs.rs/async-std/1.10.0/async_std/#features

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接