Task.WhenAll()一次只执行两个线程?

5
在这个问题中,我正在尝试缓存一个单一的值,我们称其为foo。如果该值没有被缓存,那么检索需要一段时间。
我的问题不在于实现它,而是在于测试它。
为了测试它,我使用Task.WhenAll()同时启动5个任务来获取缓存值。第一个任务异步进入锁并检索值,而其他4个线程应该等待锁。等待后,他们应该逐个重新检查缓存的值,并发现它已经被第一个缓存它的线程检索过,然后返回它而不会再次检索。
[TestClass]
public class Class2
{
    private readonly Semaphore semaphore = new Semaphore(1, 1);

    private bool? foo;

    private async Task<bool> GetFoo()
    {
        bool fooValue;

        // Atomic operation to get current foo
        bool? currentFoo = this.foo;

        if (currentFoo.HasValue)
        {
            Console.WriteLine("Foo already retrieved");
            fooValue = currentFoo.Value;
        }
        else
        {
            semaphore.WaitOne();
            {
                // Atomic operation to get current foo
                currentFoo = this.foo;

                if (currentFoo.HasValue)
                {
                    // Foo was retrieved while waiting
                    Console.WriteLine("Foo retrieved while waiting");
                    fooValue = currentFoo.Value;
                }
                else
                {
                    // Simulate waiting to get foo value
                    Console.WriteLine("Getting new foo");
                    await Task.Delay(TimeSpan.FromSeconds(5));
                    this.foo = true;
                    fooValue = true;
                }
            }
            semaphore.Release();
        }

        return fooValue;
    }

    [TestMethod]
    public async Task Test()
    {
        Task[] getFooTasks = new[] {
            this.GetFoo(),
            this.GetFoo(),
            this.GetFoo(),
            this.GetFoo(),
            this.GetFoo(),
        };
        await Task.WhenAll(getFooTasks);
    }
}

在我的实际测试和生产代码中,我通过接口检索值,并使用Moq模拟该接口。在测试结束时,我验证接口只被调用了1次(通过),而不是>1次(失败)。

输出:

Getting new foo
Foo retrieved while waiting
Foo already retrieved
Foo already retrieved
Foo already retrieved

然而,从测试的输出结果可以看出,它并不像我期望的那样。看起来只有2个线程同时执行,而其他线程则等待前两个完成后才能进入GetFoo()方法。
为什么会发生这种情况?是因为我在VS单元测试中运行它吗?请注意,我的测试仍然通过了,但不是我期望的方式。我怀疑在VS单元测试中有一些限制线程数量的限制。
3个回答

6

Task.WhenAll() 并不会 启动 任务 - 它只是在 等待 它们完成。

同样地,调用一个 async 方法实际上并没有强制并行 - 它不会引入新的线程或者其他东西。只有当你:

  • 等待某些尚未完成的内容,而你的同步上下文将继续在新线程上调度(在 WinForms 上下文中它不会这样做,例如;它只会重用 UI 线程)
  • 显式使用 Task.Run,并且任务调度程序创建一个新的线程来运行它。(当然,它可能不需要这样做。)
  • 显式启动一个新的线程。

老实说,在异步方法中使用 阻塞Semaphore 方法让我感觉非常错误。你似乎并没有真正理解异步的思想...我还没有尝试分析你的代码将要做什么,但我认为你需要更多地了解如何使用 async 以及如何最好地使用它。


这(调用异步方法并不会强制并行化)对我来说很有意义……我正在使用信号量来解决无法在lock()块中使用await的问题,而且我明白这是一件坏事,因为可能会发生死锁。我将研究其他解决方案。但至少这解释了为什么测试没有按预期执行。看起来,如果我正确实现,测试将正常工作。 - Ryan Burbidge
那么,在lock()中使用await的首选模型是什么? - Ryan Burbidge
1
@user3339997:请查看http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx。 - Jon Skeet

2
您的问题似乎出在 semaphore.WaitOne() 上。
异步方法会在遇到第一个 await 之前同步运行。在您的代码中,第一个 await 是在 WaitOne 被触发后才会出现的。方法被标记为 async 并不意味着它通常会在多个线程上运行,相反地,通常是单线程运行。
要解决这个问题,请使用 SemaphoreSlim.WaitAsync,这样调用线程将放弃控制权,直到信号量完成为止。
public class Class2
{
    private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

    private bool? foo;

    private async Task<bool> GetFoo()
    {
        bool fooValue;

        // Atomic operation to get current foo
        bool? currentFoo = this.foo;

        if (currentFoo.HasValue)
        {
           Console.WriteLine("Foo already retrieved");
           fooValue = currentFoo.Value;
        }
       else
       {
           await semaphore.WaitAsync();
           {
               // Atomic operation to get current foo
               currentFoo = this.foo;

               if (currentFoo.HasValue)
               {
                  // Foo was retrieved while waiting
                   Console.WriteLine("Foo retrieved while waiting");
                   fooValue = currentFoo.Value;
               }
              else
              {
                  // Simulate waiting to get foo value
                  Console.WriteLine("Getting new foo");
                  await Task.Delay(TimeSpan.FromSeconds(5));
                  this.foo = true;
                  fooValue = true;
              }
          }
        semaphore.Release();
    }

    return fooValue;
}

你将John的回答标记为解决方案,但它并没有为你的问题提供解决方案。 - Yuval Itzchakov
我认为他的答案稍微更正确,因为它不涉及在信号量内使用await。他提供了一个链接,说明如何在await中正确锁定,并提供了更多关于我的问题的背景信息。你的答案也很有用,所以我点了赞。 - Ryan Burbidge
他提供了一个链接到AsyncLock,它与在SemaphoreSlim上等待并没有太大的区别。他在回答中谈到不使用阻塞信号量。无论如何,你可以自由选择任何你想要的 :) - Yuval Itzchakov

1
 await Task.Delay(TimeSpan.FromSeconds(5));

这应该允许其他任务运行,但我怀疑它们被阻塞在:

semaphore.WaitOne();
混合并发样式(在本例中使用任务和手动控制与同步对象)很难做到正确无误。
(您似乎正在尝试通过多个并发任务汇聚来获得相同的值:这似乎过度了。)
默认情况下,.NET将限制Task并发性为可用的(逻辑)CPU核心数,我怀疑您的系统有两个。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接