将IEnumerable<Task<T>>转换为IObservable<T>

7
我正在尝试使用响应式扩展(Rx)在任务完成时缓存任务枚举。有没有人知道是否有一种干净的内置方法可以做到这一点?ToObservable 扩展方法只会创建一个 IObservable<Task<T>>,而这不是我想要的。我想要一个IObservable<T>,然后我可以在其上使用Buffer

虚构的例子:

//Method designed to be awaitable
public static Task<int> makeInt()
{
     return Task.Run(() => 5);
}

//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{
    //Make a bunch of tasks
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());

    //Is there a built in way to turn this into an Observable that I can then buffer?
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????

    buffered.Subscribe(ints => {
        Console.WriteLine(ints.Count()); //Should be 15
    });
}

https://dev59.com/QGvXa4cB1Zd3GeqPGScz - Mauricio Scheffer
1个回答

9

你可以使用Task可以通过另一个ToObservable()重载将其转换为可观察对象。

当你有一组(单个项)可观察对象时,你可以使用Merge()创建一个包含这些项的单一可观察对象,以便在它们完成时返回。

因此,你的代码可能看起来像这样:

futureInts.Select(t => t.ToObservable())
          .Merge()
          .Buffer(15)
          .Subscribe(ints => Console.WriteLine(ints.Count));

有没有什么简单的方法可以将 Task<IEnumerable<T>> 转换为 IObservable<T>,并且 Task<T> 的结果成为 IObservable<T> 中的结果流? - JonathanPeel
1
@JonathanPeel 我认为 task.ToObservable().SelectMany(x => x) 应该可以工作。 - svick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接