异步示例很有用,但对于 Rust 和 Tokio 新手来说,我很难弄清楚如何同时进行 N 个请求,使用一个 URL 向量,并创建每个 URL 的响应 HTML 字符串的迭代器。如何实现呢?
截至reqwest 0.11.14版本:
use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)
将一个字符串集合转换成Stream
。
.map(|url| {
在流中的每个元素上运行异步函数并将元素转换为新类型。
let client = &client; async move {
明确引用 Client
并将该引用(而非原始的 Client
)移入匿名异步块中。
let resp = client.get(url).send().await?;
使用Client
的连接池启动异步GET请求并等待请求完成。
resp.bytes().await
请求并等待响应的字节。
.buffer_unordered(N);
将一个 future 流转换为该 future 值的流,同时并发执行这些 futures。
bodies
.for_each(|b| {
async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
})
.await;
将流转换回单个future,打印出沿途接收到的数据量,然后等待future完成。
另请参阅:
如果您希望的话,还可以将一个迭代器转换为一个 future 迭代器,并使用 future::join_all
:
use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}
我建议使用第一个示例,因为通常希望限制并发性,这是buffer
和buffer_unordered
所帮助的。
并发请求通常已经足够好了,但有时您需要并行请求。在这种情况下,您需要启动一个任务。
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}
主要区别如下:
tokio::spawn
在单独的任务中执行工作。reqwest::Client
。根据推荐, 我们克隆共享客户端以利用连接池。另请参阅:
Stream::buffer_unordered
的文档:如果此流的项目可以转换为未来对象。另请参阅Rust futures中then
,and_then
和or_else
之间的区别是什么? - Shepmaster