如何连接异步可枚举对象?

12

我有一个返回类型为:

public async Task<IEnumerable<T>> GetAll()

它会进行进一步的异步调用(数量未知),每个调用都返回一个可枚举的T类型任务,然后希望将结果合并以返回。
var data1 = src1.GetAll();
var data2 = src2.GetAll();
var data3 = src3.GetAll(); //and so on

现在很容易等待所有结果并连接它们以生成单个可枚举对象,但我希望在第一次调用返回时就能够使用可枚举对象,如果有任何调用仍在等待可用结果,则需要为调用者/枚举器提供潜在的等待。
我是否必须手动编写连接操作,以解决在任务<>中包装时缺乏枚举器支持的问题?或者TPL或其他地方已经有库调用可以帮助我。我确实看过IX,但它仍处于实验版本,并不想将其合并进来。
另外,我正在尝试的是反模式吗?我可以想到一个复杂性,即异常处理 - 从调用者的角度来看,调用可能成功完成并开始使用可枚举对象,但在其中间可能会出现错误...

(顺便提一下)您的模式似乎是不对称的:代码“等待”第一个序列准备就绪,但不等待其余部分。 - Vlad
5
也许你真的需要使用 IObservable<T> 接口?这种情况下,你可以尝试使用 Observable.Concat 方法。 - Vlad
@Vlad:说得好。当我研究Ix/Rx时,我确实考虑过这一点,并且同意,响应式确实更适合这种用例。但是整体情况和应用程序是基于拉取的,我不想只为了从可观察结果返回到可枚举对象而使用Rx。 - Vivek
1个回答

7

现有一个名为 Async Enumerable 的项目,正好能解决这个问题。

你可以轻松地使用它。

例如:

IAsyncEnumerable<string> GetAsyncAnswers()
{
    return AsyncEnum.Enumerate<string>(async consumer =>
    {
        foreach (var question in GetQuestions())
        {
            string theAnswer = await answeringService.GetAnswer(question);
            await consumer.YieldAsync(theAnswer);
        }
    });
}

这里公开了一个IAsyncEnumerable<string>,它在GetAnswer返回时产生一次迭代。在你的情况下,你可以在内部公开一个IAsyncEnumerable<T>,并在GetAll内部进行调用。

我是否试图使用反模式?我能想到一个复杂点的问题, 异常处理——从调用者的角度来看,调用可能会成功完成并开始使用可枚举项,但可能会在中途出现问题...

我不这样认为。这确实存在潜在的问题,例如在等待期间出现内部异常,但这也可能发生在任何IEnumerable<T>中。异步序列是当今不断涌现的异步API现实所需的东西。

1
谢谢。标记为答案。我没有导入库,但最终还是通过一个类似于你示例中的连接器,在两行代码中自己实现了它,该连接器采用一个函数来获取下一个可枚举项。如果我有更广泛的用例,也许我会再次查看这个库。 - Vivek

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接