在Task.Run()中使用异步/等待操作

25

Task.Run(()=>{}) 将动作委托放入队列并返回任务。 在 Task.Run() 中使用 async/await 有什么好处吗? 我理解需要使用 Task.Run(),因为如果我们想直接使用 await,那么调用方法就必须变成异步并影响调用位置。

以下是一个示例代码,其中在 Task.Run() 中有异步等待。完整示例可在此处找到: 创建预计算任务

Task.Run(async () => { await new WebClient().DownloadStringTaskAsync("");});

另一种方法是:

Task.Run(() => new WebClient().DownloadStringTaskAsync("").Result;);

由于Task.Run()和Await都会将工作排队,并由线程池选择,所以Task.Run()内的async/await是否有点多余?


这个问题最好在另一个帖子中回答:异步和非异步的区别 - user2616989
5个回答

20

在 Task.Run() 中使用 async/await 是否有好处?

有好处。 Task.Run 在线程池线程上运行某些操作。如果此类操作执行一些 IO 工作并通过 await 异步等待 IO 操作完成,则系统可以在 IO 操作仍在运行时将此线程池线程用于其他工作。

例如:

Task.Run( async () =>
{
    DoSomeCPUIntensiveWork();

    // While asynchronously waiting for this to complete, 
    // the thread is given back to the thread-pool
    var io_result = await DoSomeIOOperation(); 

    DoSomeOtherCPUIntensiveWork(io_result);
});

让我们在聊天中继续这个讨论。点击此处进入聊天室 - Chandan
谢谢你的出色回答。是否有任何微软官方文档可以证实这一点? - Alex from Jitbit

10
在 Task.Run() 中使用 async/await 有什么好处吗?
当一个异步方法遇到第一个操作未完成的 await 时,它会立即返回给调用者。如果这个方法的第一个执行“连续运行”需要很长时间,那么 Task.Run 将改变其行为:它会使方法立即返回并在线程池上执行该第一个“连续运行”。
这在 UI 场景中非常有用,因为这样你就可以确保不会阻塞 UI。例如,HttpWebRequest 即使使用了其中一个异步方法(这基本上是库错误/设计错误),也会同步执行 DNS 解析。这可能会暂停 UI 线程。因此,您可以使用 Task.Run 来确保 UI 永远不会被阻塞超过几微秒。
回到最初的问题:为什么要在 Task.Run 主体内使用 await?出于与正常情况下使用 await 相同的原因:解除线程阻塞。

你为什么要在Task.Run中包装async/await代码?!因为例如我需要在同步方法中运行异步代码,而这个方法可能无法更改或更改所有调用者太过冒险。所以 - David
2
@David 在这种情况下,通常只需调用返回的任务上的Wait/Result即可。 - usr

2
在您提供的示例中,主线程被阻塞直到异步操作完成。它被调用Wait()而被阻塞(顺便说一下,这通常是个坏主意)。
让我们看一下链接示例中DownloadStringAsync返回的内容:
return Task.Run(async () =>
{
    content = await new WebClient().DownloadStringTaskAsync(address);
    cachedDownloads.TryAdd(address, content);
    return content;
});

为什么要将这个方法包装在一个Task中呢?先考虑一下你的选择。如果你不想将其包装在一个Task中,那么你该如何确保该方法返回一个Task<string>并仍然正常工作呢?当然,你可以将该方法标记为async!但是,如果你将方法标记为async并调用Wait,很可能会出现死锁,因为主线程正在等待工作完成,而你阻塞了主线程,所以它无法让你知道它已经完成。
当将一个方法标记为async时,状态机将在调用线程上运行,在你的示例中,状态机在单独的线程上运行,这意味着主线程几乎没有任何工作。

2

在非异步方法中调用异步方法

当我们试图在非异步方法中调用异步方法时,我们会做一些类似于这样的事情。特别是如果异步方法是已知的。我们使用更多的TaskFactory...符合一个模式,使调试更容易,确保每个人采取相同的方法(并且——如果异步->同步开始出现问题,让我们只有一个喉咙可以控制)。

所以,你的例子

想象一下,在你的例子中,你有一个非异步函数。而且,在那个函数内部,你需要调用await webClient.DoSomethingAsync()。你不能在不是异步的函数中调用await——编译器不会让你这么做。

选项1:僵尸感染

你的第一个选择是沿着调用堆栈向上爬,标记沿途的每个方法为异步,并添加等待到处都是。到处都是。就像到处都是一样。然后——因为所有这些方法现在都是异步的,你需要使引用它们的方法全部异步。

顺便说一句,这可能是许多SO爱好者会提倡的方法。因为,“阻塞线程是一个坏主意。”

所以。是的。这个“让异步成为异步”的方法意味着你的小型库例程获取JSON对象,刚刚触及了80%的代码。谁会给CTO打电话并让他知道呢?

选项2:只和一个僵尸一起走

或者,你可以将你的异步封装在像你的函数一样的函数中...

return Task.Run(async () => {
     content = await new WebClient().DoSomethingAsync();
     cachedDownloads.TryAdd(address, content);
     return content;
});

完成了...僵尸感染已经被限制在一小部分代码中。如何/为什么会在CPU级别执行,让位于位运算专家进行争论。我并不关心这些。我关心的是没有人需要向CTO解释为什么整个库现在必须100%异步(或类似的事情)。


-3

确认,使用Task.Run包装await会使用两个线程而不是一个。

Task.Run(async () => { //thread #1
 await new WebClient().DownloadStringTaskAsync(""); //thread #2
});

假设有四个被包装的调用,它将使用 4 x 2 = 8 个线程。

最好改为使用简单的 await 调用。例如:

Task<byte[]> t1 = new WebClient().DownloadStringTaskAsync("");
Task<byte[]> t2 = new WebClient().DownloadStringTaskAsync("");
byte[] t1Result = await t1;
byte[] t2Result = await t2;

这里有证据表明包装的 Task.Run 使用了额外的线程。(不使用 WebClient 来证明这一点)

private static async Task wrapped()
{
    List<Task> tasks = new List<Task>();
    tasks.AddRange(new []
    {
        Task.Run(async() => await new MyThread().RunMe()),
        Task.Run(async() => await new MyThread().RunMe()),
        Task.Run(async() => await new MyThread().RunMe()),
        Task.Run(async() => await new MyThread().RunMe()),
    });

    Thread.Sleep(1000);
    int number = Process.GetCurrentProcess().Threads.Count;
    Console.WriteLine($"While running thread count: {number}");

    await Task.WhenAll(tasks);
}

enter image description here

未包装

private static async Task unwrapped()
{
    List<Task> tasks = new List<Task>();
    Task<int> t1 = new MyThread().RunMe();
    Task<int> t2 = new MyThread().RunMe();
    Task<int> t3 = new MyThread().RunMe();
    Task<int> t4 = new MyThread().RunMe();
    tasks.AddRange(new[] {t1, t2, t3, t4});

    Thread.Sleep(1000);
    int number = Process.GetCurrentProcess().Threads.Count;
    Console.WriteLine($"While running thread count: {number}");

    int i1 = await t1;
    int i2 = await t2;
    int i3 = await t3;
    int i4 = await t4;
}

enter image description here

完整的 POC 代码在这里

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncThreadDemo
{
    class Program
    {
        static async Task Main(string[] args)
        {
            int number = Process.GetCurrentProcess().Threads.Count;
            Console.WriteLine($"Init thread count: {number}");


            //await wrapped();
            await unwrapped();

            number = Process.GetCurrentProcess().Threads.Count;
            Console.WriteLine($"Done thread count: {number}");

            Console.ReadLine();
        }

        private static async Task wrapped()
        {
            List<Task> tasks = new List<Task>();
            tasks.AddRange(new []
            {
                Task.Run(async() => await new MyThread().RunMe()),
                Task.Run(async() => await new MyThread().RunMe()),
                Task.Run(async() => await new MyThread().RunMe()),
                Task.Run(async() => await new MyThread().RunMe()),
            });

            Thread.Sleep(1000);
            int number = Process.GetCurrentProcess().Threads.Count;
            Console.WriteLine($"While running thread count: {number}");

            await Task.WhenAll(tasks);
        }

        private static async Task unwrapped()
        {
            List<Task> tasks = new List<Task>();
            Task<int> t1 = new MyThread().RunMe();
            Task<int> t2 = new MyThread().RunMe();
            Task<int> t3 = new MyThread().RunMe();
            Task<int> t4 = new MyThread().RunMe();
            tasks.AddRange(new[] {t1, t2, t3, t4});

            Thread.Sleep(1000);
            int number = Process.GetCurrentProcess().Threads.Count;
            Console.WriteLine($"While running thread count: {number}");

            int i1 = await t1;
            int i2 = await t2;
            int i3 = await t3;
            int i4 = await t4;
        }
    }

    public class MyThread
    {
        public static int _counter;

        public async Task<int> RunMe()
        {
            await Task.Run(() =>
            {
                for (int i = 0; i < 2; ++i)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine($"T{Thread.CurrentThread.ManagedThreadId} {i}");
                }
                Console.WriteLine($"T{Thread.CurrentThread.ManagedThreadId} done");
            });
            return _counter++;
        }
    }
}

Jeson yeap,根据StackOverflow规则,答案现在是可以的,但本质上并不是一个好的答案。Task.Run方法不会生成新线程,而是重用ThreadPool线程。对于大多数情况下,WebClient.DownloadStringTaskAsync使用零个线程ThreadPool仅被利用了很短的时间,因此单个线程可能能够完成所有所需的工作。 - Theodor Zoulias
@TheodorZoulias 单词 spawn 已更名为 use。同时添加了 POC。希望这能证明这一点。在 Task.Run 中包装异步调用会导致不必要的开销。 - Jeson Martajaya
我无法重现你的概念证明程序的结果。在我的电脑上,使用 await wrapped();await unwrapped(); 输出为:Done thread count: 14。我在Fiddle上得到了类似的结果。我添加了ThreadPool.ThreadCount的日志记录以保证完整性。顺便提一句,你的概念证明与原始的 WebClient 示例并不相同。它阻塞了一个线程,而不是调用真正的异步操作。请使用await Task.Delay(1000);来使概念证明保持相关(在Fiddle上试试)。 - Theodor Zoulias
看起来这个问题是与.NET版本有关的。我在.NET框架4.7.2、VS2019和Windows机器上拥有PoC,并且运行没有问题。然而,在.NET框架4.7.2中,.NET Fiddle在调用Proess.GetCurrentProcess时会抛出站点安全错误。请尝试在本地机器上使用4.7.2下的PoC代码运行。https://dotnetfiddle.net/plS0ka此外,为了公平起见,在MyThread类中,请使用Thread.Sleep(1000)而不是await Task.Delay(1000)。后者更有效率,因为它在等待时释放线程。前者则会占用线程。 - Jeson Martajaya
Jeson,你的 PoC 观察到的是 ThreadPool 类的行为,这个类从 .NET Framework 到 .NET Core 可能已经发生了变化。它的行为也取决于 CPU 核心的数量,我猜测你的机器核心数超过了 4 个。仅仅比较不同时刻的线程数并不能说明程序到目前为止消耗的总处理能力。而且坚持使用 Task.Run(() => Thread.Sleep()) 模拟 WebClient.DownloadStringTaskAsync 方法会使你的答案与问题无关。前者方法不会让线程休眠。 - Theodor Zoulias
@TheodorZoulias,你是否尝试在VS2019、.NET Framework 4.7.2中直接运行我的PoC?这应该能解决异步包装是否有害的争论。最好的情况是它有微不足道的好处,最坏的情况是每次调用都会使用额外的线程,正如POC所展示的那样。不需要再用言语争论了。 - Jeson Martajaya

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接