使用async/await处理多个任务

532
我正在使用一个完全异步的API客户端,也就是说,每个操作都返回TaskTask<T>,例如:
static async Task DoSomething(int siteId, int postId, IBlogClient client)
{
    await client.DeletePost(siteId, postId); // call API client
    Console.WriteLine("Deleted post {0}.", siteId);
}

使用 C# 5 的 async/await 操作符,启动多个任务并等待它们全部完成的正确/最有效方式是什么:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

或者:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

由于API客户端在内部使用HttpClient,因此我预计它会立即发出5个HTTP请求,并在每个请求完成时写入控制台。


2
那么问题是什么? - Serg Shevchenko
5
问题在于他的Parallel.ForEach写错了(请参见答案)- 他正在询问他尝试以并行方式运行异步代码的尝试是否正确,提供了两种解决方案,并且想知道哪一种更好(可能是为什么)。 - AnorZaken
由于没有人提到值得注意的MSDN文档早餐类比,将其全部分解:https://learn.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/ - markmnl
相关链接:使用异步Lambda的并行foreach - undefined
8个回答

719
int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());
尽管你可以使用上述代码并行运行操作,但这段代码会阻塞每个操作所在的线程。例如,如果网络调用需要2秒钟,那么每个线程都会挂起2秒钟,除了等待什么也不做。
int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

另一方面,使用WaitAll的上述代码也会阻塞线程,直到操作结束,您的线程将无法处理任何其他工作。

推荐方法

我建议使用WhenAll,它将在并行异步地执行您的操作。

public async Task DoWork() {

    int[] ids = new[] { 1, 2, 3, 4, 5 };
    await Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}
实际上,在上面的情况中,你甚至不需要等待await,因为你没有任何连续操作,可以直接从该方法返回。
public Task DoWork() 
{
    int[] ids = new[] { 1, 2, 3, 4, 5 };
    return Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}
为了证明这一点,这里提供了一篇详细的博客文章,介绍了所有备选方案及其优缺点:在 ASP.NET Web API 中如何使用并发异步 I/O

42
以上代码中使用WaitAll函数也会阻塞线程,它只会阻塞调用WaitAll的那个线程,而不是所有线程。 - Rawling
6
根据文档中的描述,"Type: System.Threading.Tasks.Task[],一个要等待的 Task 实例数组"。因此,它会阻塞所有线程。 - Mixxiphoid
46
您引用的那部分内容并不意味着它会阻塞所有线程。它仅在提供的任务运行时阻止调用线程。这些任务实际上如何运行取决于调度程序。通常,在每个任务完成后,它所在的线程将被返回到池中。每个线程都不会一直被阻塞,直到其他线程完成。 - musaul
4
据我理解,“经典”任务方法和异步对应方法之间唯一的区别在于它们在任务开始运行和结束之间如何与线程交互。在默认调度程序下,经典方法在此期间将独占一个线程(即使它正在“休眠”),而异步方法则不会。除了这段时间以外,也就是任务被调度但尚未启动以及任务完成但其调用者仍在等待时,两种方法没有任何区别。 - musaul
5
请参考https://dev59.com/4G025IYBdhLWcg3wQDdb#6123432。区别在于调用线程是否被阻塞,其余相同。您可能需要编辑答案以进行澄清。 - Răzvan Flavius Panda
显示剩余5条评论

66

我很好奇看到问题中提供的方法和被接受的答案的结果,所以我进行了测试。

这是代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public async Task DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart-testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd-workerStart).TotalSeconds.ToString("F2"), (workerEnd-testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart).Wait());
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWork(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWork(testStart)));
        }
    }
}

最终的输出:

Starting test: Parallel.ForEach...
Worker 1 started on thread 1, beginning 0.21 seconds after test start.
Worker 4 started on thread 5, beginning 0.21 seconds after test start.
Worker 2 started on thread 3, beginning 0.21 seconds after test start.
Worker 5 started on thread 6, beginning 0.21 seconds after test start.
Worker 3 started on thread 4, beginning 0.21 seconds after test start.
Worker 1 stopped; the worker took 1.90 seconds, and it finished 2.11 seconds after the test start.
Worker 2 stopped; the worker took 3.89 seconds, and it finished 4.10 seconds after the test start.
Worker 3 stopped; the worker took 5.89 seconds, and it finished 6.10 seconds after the test start.
Worker 4 stopped; the worker took 5.90 seconds, and it finished 6.11 seconds after the test start.
Worker 5 stopped; the worker took 8.89 seconds, and it finished 9.10 seconds after the test start.
Test finished after 9.10 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 1, beginning 0.01 seconds after test start.
Worker 2 started on thread 1, beginning 0.01 seconds after test start.
Worker 3 started on thread 1, beginning 0.01 seconds after test start.
Worker 4 started on thread 1, beginning 0.01 seconds after test start.
Worker 5 started on thread 1, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 1, beginning 0.00 seconds after test start.
Worker 2 started on thread 1, beginning 0.00 seconds after test start.
Worker 3 started on thread 1, beginning 0.00 seconds after test start.
Worker 4 started on thread 1, beginning 0.00 seconds after test start.
Worker 5 started on thread 1, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.00 seconds after the test start.
Test finished after 5.00 seconds.

4
如果您在每个结果上标注时间,这将更有用。 - Serj Sagan
12
@SerjSagan,我最初的想法只是为了验证每种情况下工作人员是否被并发启动,但我已经添加了时间戳来改善测试的清晰度。感谢你的建议。 - RiaanDP
感谢您进行测试。但是,感觉有点奇怪,您在与“工作线程”不同的线程上运行了thread.sleep。虽然在这种情况下并不重要,但如果我们正在模拟计算工作,使用Task.Run工作线程会更有意义,或者如果我们正在模拟I/O,则只需使用Task.Delay而不是sleep。只是想知道您对此有何想法。 - AnorZaken

33

您可以使用 Task.WhenAll 函数,您可以将任意数量的任务传递给它。 Task.WhenAll 返回一个新任务,在所有任务完成时完成。请确保在 Task.WhenAll 上异步等待,以避免阻塞 UI 线程:

public async Task DoSomethingAsync() {
    Task[] tasks = new Task[numTasks];
    for (int i = 0; i < numTasks; i++)
    {
        tasks[i] = DoChildTaskAsync();
    }
    await Task.WhenAll(tasks);
    // Code here will execute on UI thread
}

26

由于你所调用的API是异步的,Parallel.ForEach版本并不太合适。在WaitAll版本中不应该使用.Wait,否则会失去并行性。另一个选择是,如果调用方是异步的,则可以在执行SelectToArray生成任务数组后使用Task.WhenAll。第二个选择是使用Rx 2.0。


13
Parallel.ForEach需要一个定义好的用户工作线程列表和一个非异步Action来对每个工作线程执行操作。 Task.WaitAllTask.WhenAll需要一个List<Task>,这些任务是异步的。
我发现RiaanDP这里的回答很有用,可以理解它们之间的区别,但是Parallel.ForEach部分需要进行更正。由于声望不够,因此写下了自己的回答。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public void DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                Thread.Sleep(SleepTimeout);
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }

            public async Task DoWorkAsync(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart));
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWorkAsync(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWorkAsync(testStart)));
        }
    }
}

下面是生成的输出。执行时间可以相互比较。我在计算机正在进行每周一次的反病毒扫描时运行了此测试。更改测试顺序确实会改变它们的执行时间。

Starting test: Parallel.ForEach...
Worker 1 started on thread 9, beginning 0.02 seconds after test start.
Worker 2 started on thread 10, beginning 0.02 seconds after test start.
Worker 3 started on thread 11, beginning 0.02 seconds after test start.
Worker 4 started on thread 13, beginning 0.03 seconds after test start.
Worker 5 started on thread 14, beginning 0.03 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.02 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.02 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.03 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.03 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.03 seconds after the test start.
Test finished after 5.03 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

那非常有趣。从并行工作线程中删除".Wait()",使它们以与其他线程相同的速度运行。同样地,PerformTest_ParallelForEach不会太快返回,它仍然等待工作线程完成。不清楚为什么当并行使用5个独立线程时,它们不会并行运行并在一秒钟内完成。我错过了什么吗? - David Pierson
1
我和之前的回答不同之处在于,我将不同的工作类型传递给了迭代器。我将DoWork(非async Action)传递给了Parallel.ForEach,并将DoWorkAsyncasync Task)传递给了Task.WaitAllTask.WhenAllParallel.ForEach需要一个Task。将.Wait()添加到DoWorkAsync会使其成为一个Task,但这会阻止并发,这不是我们想要的。 - JPortillo
谢谢,这非常有道理。我的错误,它们确实在Parallel.ForEach中并行运行,但它们具有不同的SleepTimeout值。我忽略了这一点。 - David Pierson

8
这个问题已经十年了,OP当时在问关于C#5的问题。
截至今天,有一个新选项:Parallel.ForEachAsync方法,它是在.NET 6中引入的。
以下是基于OP代码的示例:
int[] ids = new[] { 1, 2, 3, 4, 5 };
await Parallel.ForEachAsync(ids, async (i,token) => await DoSomething(1, i, blogClient));

这是完全异步的,不会阻塞任何线程。

此外,与Task.WaitAll和Task.WhenAll方法相比,它可能更好,因为它们不限制并行运行的线程数量。因此,如果您有一个巨大的数组,它可能会占用所有可用的RAM。Parallel.ForEachAsync允许您指定并行度,例如:

var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };

await Parallel.ForEachAsync(ids, options, async (i,token) => await DoSomething(1, i, blogClient));

这样你只有4个线程同时运行。


这样,你只有4个并行运行的线程。更准确地说,是异步执行流,而不是线程。在任何时刻使用的线程数量很可能是,而不是4个。 - undefined

5

所有答案都是为了运行同一个函数。

下面的代码适用于调用不同的函数。只需要将你的普通 Task.Run() 放进一个数组中,并使用 Task.WhenAll() 进行调用:

await Task.WhenAll(new Task[] { 
    Task.Run(() => Func1(args)),
    Task.Run(() => Func2(args))
});

这个答案和这个答案比你在这里做的更“复杂”在哪里?它们只是一样地等待Task.WhenAll() - Lance U. Matthews

1
我想补充之前所有优秀答案的内容,如果您编写一个库,使用ConfigureAwait(false)是个好习惯,可以提高性能,详情请参见这里
因此,这段代码似乎更好:
 public static async Task DoWork() 
 {
     int[] ids = new[] { 1, 2, 3, 4, 5 };
     await Task.WhenAll(ids.Select(i => DoSomething(1, i))).ConfigureAwait(false);
 }

一个完整的fiddle链接在这里

1
这个说法没错,但我怀疑 OP 写的不是一个库。看起来更有可能是写应用程序代码,在那里 ConfigureAwait 只会使代码混乱,并妨碍阅读,实际上几乎没有任何性能提升。 - Theodor Zoulias
2
你说得没错,但我认为这是讨论中的一个重要观点。 - Ygalbel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接