将有限并发性与期货合并

7

我有一个大的Hyper HTTP请求future向量,想要将它们解决成结果向量。由于有最大打开文件数的限制,我想将并发限制为N个futures。

我尝试过Stream::buffer_unordered,但似乎它是一个接一个地执行futures。


2
你能发一下你已经有的代码吗? - Peter Hall
请编辑您的问题,解释为什么您说“似乎它一个接一个地执行了未来”。我已经使用buffer_unordered来实现这个确切的目的,并且它对我起作用了。 - Shepmaster
1个回答

5
我们在一个项目中使用了类似于这样的代码,以避免打开过多的TCP套接字。这些future中包含了Hyper的future,因此看起来情况完全一样。
// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
    futures::stream::iter(iterator_of_futures.map(Ok))
    .buffer_unordered(PARALLELISM);

// Everything after here is just using the stream in
// some manner, not directly related

let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);

// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
    match core.run(all_done) {
        Ok((None, _)) => break,
        Ok((Some(v), next_all_done)) => {
            successes.push(v);
            all_done = next_all_done.into_future();
        }
        Err((v, next_all_done)) => {
            failures.push(v);
            all_done = next_all_done.into_future();
        }
    }
}

这段代码用到了事件循环(core),因此需要显式地驱动它。通过观察程序使用的文件句柄数量,我们发现它被限制了。在添加这个瓶颈之前,我们很快就用完了允许的文件句柄,而之后则没有。


使用StreamExt::fold可以更简单地累积成功/失败(并符合函数式/迭代器风格)。 - nirvana-msu
@nirvana-msu 可能吧。请注意,这个答案是来自2017年的,早于稳定的async/await语法,并且使用的是futures 0.1,而不是0.3。自那时以来发生了很多变化。 - Shepmaster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接