如何在.NET 4.5中从并行任务中获取结果

9

我想在并行任务/等待中使用.NET迭代器。类似于这样:

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    Parallel.ForEach(
        source,
        s=>
        {
            // Ordering is NOT important
            // items can be yielded as soon as they are done                
            yield return ExecuteOrDownloadSomething(s);
        }
}

很遗憾,.NET不能在本地处理此操作。迄今为止,@svick的最佳答案是使用AsParallel()。

额外奖励:是否有任何简单的async/await代码实现多个发布者和一个订阅者? 订阅者将会产生,而出版物将会被处理。(仅限核心库)

3个回答

11
这似乎是 PLINQ 的工作:
return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));

这将使用有限数量的线程并行执行委托,返回每个结果完成后的结果。

如果ExecuteOrDownloadSomething()方法是I/O绑定的(例如它实际上下载了某些内容)并且您不想浪费线程,则使用async-await可能是有意义的,但这会更加复杂。

如果您想充分利用async,则不应返回IEnumerable,因为它是同步的(即如果没有可用项,则会阻塞)。您需要一些异步集合,并且可以使用TPL Dataflow中的ISourceBlock(具体来说,是TransformBlock):

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    var block = new TransformBlock<TSrc, TDest>(
        async s => await ExecuteOrDownloadSomethingAsync(s),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    return block;
}

如果源代码“缓慢”(即在迭代source完成之前您想要开始处理来自Foo()的结果),您可能希望将foreachComplete()调用移动到单独的Task中。更好的解决方案是将source也转换为ISourceBlock<TSrc>


谢谢,但您能举个例子说明如何使用async/await解决这个问题吗?谢谢! - Yuri Astrakhan
@Yurik,你能解释一下为什么你想要那个吗? - svick
主要是因为我觉得这会帮助我理解新的await语法,针对的不是“async 101”这样的基础问题,而是一个真实世界的场景。 - Yuri Astrakhan
标记为已接受,但主要是针对AsParallel()。我想我真正寻找的是使用async/await实现多个发布者+单个订阅者,并且仅使用核心库。但还是谢谢! - Yuri Astrakhan
@Yurik 你为什么关心一个库是核心库还是非核心库? - svick
因为如果使用核心库,它有助于学习新的基本概念/模式,而如果是一个不太知名的第三方库,它会让我依赖于该库,但并不帮助掌握核心语言。 - Yuri Astrakhan

1
所以看起来你真正想做的是基于任务完成时间来排序任务序列。这并不是非常复杂的操作:
public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
    var input = tasks.ToList();

    var output = input.Select(task => new TaskCompletionSource<T>());
    var collection = new BlockingCollection<TaskCompletionSource<T>>();
    foreach (var tcs in output)
        collection.Add(tcs);

    foreach (var task in input)
    {
        task.ContinueWith(t =>
        {
            var tcs = collection.Take();
            switch (task.Status)
            {
                case TaskStatus.Canceled:
                    tcs.TrySetCanceled();
                    break;
                case TaskStatus.Faulted:
                    tcs.TrySetException(task.Exception.InnerExceptions);
                    break;
                case TaskStatus.RanToCompletion:
                    tcs.TrySetResult(task.Result);
                    break;
            }
        }
        , CancellationToken.None
        , TaskContinuationOptions.ExecuteSynchronously
        , TaskScheduler.Default);
    }

    return output.Select(tcs => tcs.Task);
}

这里我们为每个输入任务创建一个TaskCompletionSource,然后遍历每个任务并设置一个继续项,该继续项从BlockingCollection中获取下一个完成源并设置其结果。第一个完成的任务获取返回的第一个tcs,第二个完成的任务获取返回的第二个tcs,以此类推。
现在你的代码变得非常简单:
var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
    .Order();
foreach(var task in tasks)
{
    var result = task.Result;//or you could `await` each result
    //....
}

谢谢,但我需要的是从一个方法中获取处理后对象的流。你提供的基本上是Parallel.ForEach()的重写。 - Yuri Astrakhan
@Yurik 如果您不需要等待所有项目完成,可以删除 WhenAll/WaitAll,但除此之外,我无法看出 Select 本身不能满足您的需求。您有一系列项目,并且想将其转换为任务序列,每个项目对应一个任务。Select(item=> LongRunningOperation(item)) 如何不符合您的需求,因为它返回一个任务序列? - Servy
在这种情况下,项目的顺序将与原始顺序相同,这可能效率低下。我不介意项目的无序产生。 - Yuri Astrakhan

0
在由微软机器人团队开发的异步库中,他们使用并发原语允许使用迭代器来生成异步代码。
这个库(CCR)是免费的(以前不是)。你可以在这里找到一篇很好的介绍文章:Concurrent affairs
也许你可以将这个库与.Net任务库一起使用,或者它会激发你自己动手实现一个类似的库。

你能解释一下在这里你将如何使用CCR吗? - svick
我引用的文章可以比我更好地解释它。如果您查看并检查图表:“Figure 6 SerialAsyncDemo”,它具有一个代码示例,几乎与OP所要求的完全相同:使用.Net迭代器进行异步操作以产生结果。我承认,尽管当时这种迭代器语法很聪明,但现在已经被async/await语法大多取代了。 - Toad

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接