使用 Task.WhenAll 和由 BlockingCollection 产生的无限 Tasks

7

我正在将后台任务添加到阻塞集合中(在后台中添加)。

我使用由GetConsumingEnumerable返回的可枚举对象上的Task.WhenAll进行等待。

我的问题是:接收IEnumerable的Task.WhenAll重载是否“准备好”可能接收无限数量的任务?

我不确定是否可以这样做,或者它是否应该这样使用?

private async Task RunAsync(TimeSpan delay, CancellationToken cancellationToken)
{
    using (BlockingCollection<Task> jobcollection = new BlockingCollection<Task>())
    {
        Task addingTask = Task.Run(async () =>
        {
            while (true)
            {
                DateTime utcNow = DateTime.UtcNow;
                var jobs = Repository.GetAllJobs();
                foreach (var job in GetRootJobsDue(jobs, utcNow))
                {
                    jobcollection.Add(Task.Run(() => RunJob(job, jobs, cancellationToken, utcNow), cancellationToken), cancellationToken);
                }

                await Task.Delay(delay, cancellationToken);
            }
        }, cancellationToken);

        await Task.WhenAll(jobcollection.GetConsumingEnumerable(cancellationToken));
    }
}

似乎使用 Parallel.Foreach(Repository.GetAllJobs,....) 将是一个更简单的解决方案。 - L.B
如果有无限数量的任务,它们永远不会全部完成,因为你永远无法获得整个集合。 - Servy
@rudimenter 在你回复之前,我编辑了我的评论。看看是否有意义? - Sriram Sakthivel
@Servy 这就是我想要的... 只有在 CancellationToken 被触发时才返回...(深入到其他代码中) - rudimenter
@rudimenter 如果不使用WhenAll,只需等待取消令牌即可。 - Servy
显示剩余5条评论
3个回答

7

由于你的目标仅是等待取消令牌被取消,你应该做那个。正如其他人所解释的原因,使用WhenAll处理无限任务序列并不是解决问题的方法。有更简单的方法来获取永远不会完成的任务。

await new TaskCompletionSource<bool>().Task
    .ContinueWith(t => { }, cancellationToken);

他的目标似乎是等到所有任务都完成,而不是等到它们全部被创建。 - Kyle W
这是怎么工作的?TaskCompletionSource<bool>()Task永远不会完成,这意味着ContinueWith永远不会执行。我猜你的意思是在任务上调用SetCancelled - Sriram Sakthivel
@SriramSakthivel 不,这个代码可以直接使用。当令牌被取消时,它将停止等待它所在的永无止境的任务完成。你的方法也可以,只是需要更多的代码行数。 - Servy
我明白了,太棒了。我没注意到。。+1 - Sriram Sakthivel
这也会使BlockingCollection变得无用...我可以在后台中简单地运行循环中的任务,只需等待添加任务。 - rudimenter

5
Task.WhenAll不能处理无限数量的任务。它会首先(同步地)等待可枚举对象完成,然后(异步地)等待它们全部完成。
如果你想以异步方式响应序列,则需要使用IObservable<Task>(响应式扩展)。你可以使用TPL Dataflow BufferBlock作为可以处理同步或异步代码的“队列”,并且容易转换为IObservable<Task>

并行.ForEach是否也会等到枚举完成后才停止? - rudimenter
@rudimenter Parallel.Foreach 是一种固有的同步操作,而不是异步操作。它不仅会同步等待序列完成,还会同步等待处理所有项目的完成。 - Servy
我非常确定Parallel在开始处理之前不会立即等待枚举,但您将“失去”一个线程来进行枚举。 - Stephen Cleary

0

我认为Task.WhenAll会尝试枚举集合,这意味着它本身将阻塞,直到集合完成或取消。如果没有这样做,那么代码理论上可以在创建任务之前完成await。所以在这里会有一个额外的块...它将阻塞等待线程被创建,然后再次阻塞直到任务完成。我认为对于你的代码来说这不是什么坏事,因为它仍然会阻塞到同一时间点。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接