在异步环境中共享资源的高效方法是什么?

4
想象一下有几个任务同时尝试使用资源池。资源池中的单个资源只能被特定数量的任务同时使用;这个数字可以是一。
在同步环境中,我认为 WaitHandle.WaitAny & Semaphore 是正确的选择。
var resources = new[] { new Resource(...), new Resource(...) }; // 'Resource' custom class wrapers the resource
var semaphores = new[] { new Semaphore(1, 1), new Semaphore(1, 1) };
... 
var index = WaitHandle.WaitAny(semaphores);

try
{
    UseResource(resources[index]);
}
finally
{
    semaphores[index].Release();
}

但是在异步环境下,我们应该怎么做呢?


2
@AndreasNiedermair 你确定吗?你有读过《面向 Stack Overflow 用户的代码审查指南》吗?我个人认为这个问题在 Stack Overflow 上比在 Code Review 上更合适。 - Simon Forsberg
@SimonAndréForsberg 我之前不知道“请勿使用自定义原因投票关闭问题,认为它应该在Code Review上发布”的口号 - 谢谢你,我会撤回我的投票。对于表格中的其他要点,我有些犹豫 - 我的意图是:让社区决定...特别是“看起来不太自然”和“是否有更好的方法”表明了相当多的建议(即使标题有点偏离...)这些建议非常适合CR。 - user57508
@drowa:你有哪些资源可以允许多个用户,但不是任意数量的用户? - Stephen Cleary
@drowa:此外,您的代码显示了一组在编译时已知的资源,并且不能添加更多的能力;这是否是您实际需要的? - Stephen Cleary
@StephenCleary:我故意省略了我的用例细节,以避免陷入讨论与主题无关的细节中。话虽如此,我的实际资源池是一组运行银行核心系统的服务器集群。这是一个非常专业和老派(读作大型机)的数据库系统。协议是通过TCP/IP进行的,并且是无状态的(就像HTTP)。服务器上的每个端口一次只接受一个客户端,并且每个服务器仅有一个可用端口。我的应用程序是一种代理/多路复用器,位于这个旧系统和几个现代中间件之间。 - drowa
显示剩余3条评论
4个回答

7

我通常建议开发人员将“池化”逻辑与“使用”逻辑分开。这种分离的一个好处是只有池化逻辑需要同步。

在实际情况中,资源数量将在运行时得知;更确切地说,它将在应用程序初始化时(即配置)确定。

服务器上的每个端口一次只接受一个客户端,每个服务器只有一个可用端口。

因此,您拥有有限的资源集,并且每个资源一次只能由一个线程使用。

由于您无法按需创建新资源,因此需要一个信号来知道何时有资源可用。您可以自己完成此操作,或者可以使用类似于 BufferBlock<T> 的东西作为异步就绪队列。

由于每个资源一次只能由一个线程使用,我建议使用常见的 IDisposable 技术将资源释放回池中。

将这些组合在一起:

public sealed class Pool
{
  private readonly BufferBlock<Resource> _block = new BufferBlock<Resource>();

  public Pool()
  {
    _block.Post(new Resource(this, ...));
    _block.Post(new Resource(this, ...));
  }

  public Resource Allocate()
  {
    return _block.Receive();
  }

  public Task<Resource> AllocateAsync()
  {
    return _block.ReceiveAsync();
  }

  private void Release(Resource resource)
  {
    _block.Post(resource);
  }

  public sealed class Resource : IDisposable
  {
    private readonly Pool _pool;
    public Resource(Pool pool, ...)
    {
      _pool = pool;
      ...
    }

    public void Dispose()
    {
      _pool.Release(this);
    }
  }
}

使用方法:

using (var resource = Pool.Allocate())
    UseResource(resource);

或者:

using (var resource = await Pool.AllocateAsync())
    await UseResourceAsync(resource);

我之前完全不知道System.Threading.Tasks.Dataflow这个命名空间(我需要一些时间来消化这个答案)。我的应用程序还有一个要求,就是某些特殊任务(每个服务器一个),能够等待池中的特定服务器(而不是任何服务器)。这些特殊任务会定期执行握手请求,以防止由于不活动而关闭连接。您知道BufferBlock<T>是否可以提供这个功能吗?(是的,我的应用程序与服务器之间的连接始终保持打开状态)。 - drowa
@drowa:BufferBlock<T>并不提供这种功能。它实际上更像是一个异步兼容的生产者/消费者队列。如果心跳消息可以与常规消息交错,我会考虑让资源本身处理心跳消息。 - Stephen Cleary
很遗憾,心跳消息不能交错发送,因为它们是普通消息(一种NOP指令)。服务器一次只能处理一个请求/响应。在收到上一个请求的响应后,才能发送新的请求。 - drowa
@drowa:有趣...而且复杂...我会使用流进行建模,其中每个资源都有一个数据请求流,并且还有一个位于它们之前的单独流,充当负载均衡器的作用。你可以使用TPL Dataflow(一个BufferBlock链接到一组BufferBlock)来实现这一点;Rx是另一个选择。 - Stephen Cleary
在我看来,这并不是很复杂。如果处于同步环境中,Semaphore 将满足所有的要求。 - drowa
1
@drowa:只有在您依赖于未记录的行为细节时,例如“WaitOne”仅会更改单个同步原语的状态,才会发生这种情况。做出这样的假设最终会让您登上陈纳德的博客。 - Stephen Cleary

3
  1. 将“WaitAny”样式的逻辑封装到一个辅助程序中。这使得代码感觉自然。这样可以消除混乱。通常情况下,由于使用了await,异步代码在结构上可以与同步版本完全相同。
  2. 关于性能,这应该比同步等待句柄执行得更好,因为同步等待需要内核模式转换和上下文切换。确保不依赖于异常来控制流程(例如取消),因为那样会非常慢(每个异常大约需要0.1微秒)。

还有任何疑虑吗?请留言。


我的性能问题更多地关注于我们将不必要地获取锁,而这在同步版本中(至少表面上)不会发生。另一个问题是,如果我决定取消其他等待,以确保ContinueWith创建的任务在我开始使用资源之前完成,那么我如何避免控制流异常? - drowa
取消的任务会转换为已取消状态,就这样。只是不要等待/结果/等待它们,以免出现异常。如果需要等待它们,可以使用.ContinueWith(_ => { }, ExecuteSync)来抑制异常。我希望框架能够支持等待而不抛出异常。 - usr
嗯,WaitAsync 似乎在内部使用异常。这可能会导致性能不佳。也许你需要实现自己的信号量,当你知道如何做时,这并不难。参考源代码是可用的。 - usr

0

我可能会选择在对象上使用互斥锁。 任何一种都可以强制线程等待,直到锁定或互斥锁被释放。


0

以下是异步版本 Task.WhenAny & SemaphoreSlim

var resources = new[] { new Resource(...), new Resource(...) }; // 'Resource' custom class wrapers the resource
var semaphores = new[] { new SemaphoreSlim(1, 1), new SemaphoreSlim(1, 1) };
...
var waits = new[] { semaphores[0].WaitAsync(), semaphores[1].WaitAsync() };

var index = Array.IndexOf(waits, await Task.WhenAny(waits));

// The wait is still running - perform compensation.
if (index == 0)
    waits[1].ContinueWith(_ => semaphores[1].Release());
else if (index == 1)
    waits[0].ContinueWith(_ => semaphores[0].Release());

try
{
    await UseResourceAsync(resources[index]);
}
finally
{
    semaphores[index].Release();
}

它似乎不像同步版本那样自然(和高效)。有更好的方法吗? - drowa
引用来自 SemaphoreSlim 源代码头部的注释:_“一个轻量级的信号量类,包含基本的信号量函数以及一些有用的函数,如中断和等待句柄公开,以允许在多个信号量上等待”_。这个注释是否暗示了 SemaphoreSlim 的作者认为这种方法不好? - drowa

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接