异步等待n件事情发生

3

我正在寻找类似于Semaphore的东西,但在所有插槽都释放后解析。

像这样的东西:

use semaphore = new SemaphoreSlim(0,100)

anEvent.add(fun _ -> semaphore.Release(1) |> ignore);

async {
   do! thingThatCausesAnEventToFire100Times()
   //where 100 is the available slots instead of the timeout.
   let! thingsHappened = semaphore.WaitAsync(100) |> Async.AwaitTask
   thingsHappened |> should be True
}

哦,如果你知道C#的答案也可以 :P - albertjan
信号量和其他东西可能会在异步(不同的线程)方面出现问题,因此作为第一次尝试,我会选择简单的锁定/引用计数/手动重置事件。 - Random Dev
@Carsten 信号量是用于异步/多线程使用的。从SemaphoreSlim的MSDN中可以看到:“表示轻量级的替代信号量,它限制了能够同时访问资源或资源池的__线程__数量。” - albertjan
但是 Semaphore 并不能解决我的问题 :)。对于 refcount 锁 manualresetevent 路径,我需要状态。这是我迄今为止成功避免的 :)。 - albertjan
这就是为什么我写了“可能” - 如果信号量工作,我会非常仔细地阅读每个文档:很好 - 但例如监视器在异步上下文中可能会失败,其中您在中间进行线程切换:https://msdn.microsoft.com/en-us/library/system.threading.monitor.exit(v=vs.110).aspx - Random Dev
状态问题...你的插槽计数器将始终是一个状态。 - Random Dev
1个回答

3
听起来需要使用一个MailboxProcessor。可以这样做:
type SemaphoreCommand =
  |Release
  |Wait of AsyncReplyChannel<unit>

let semaphore slots =
    Agent.Start
    <| fun inbox ->
        let rec loop c (w:AsyncReplyChannel<unit> list) =
          async {
            let! command = inbox.Receive()
            match command with
            |Release -> if (c + 1) = slots then w |> List.iter(fun t -> t.Reply()) 
                        else return! loop (c + 1) w
            |Wait a -> return! loop c (a::w)
        }
        loop 0 []

let slotWaiter = semaphore 100

//Events will fill up slots
Release |> slotWaiter.Post
Release |> slotWaiter.Post

async{
  //Wait for all slots to be filled
  do! slotWaiter.PostAndAsyncReply(fun t -> Wait t)
  //All slots filled - continue
}

我没有处理这种情况,即在所有插槽都被填充或重置后,您可能尚未注册AsyncReplyChannel,但这相当简单,我将其留给读者作为练习 :)


谢谢,我会看看这个有多简单 : )。我之前没有想到用 MailBoxProcessor,但它看起来很适合。 - albertjan
1
w = [] 时,w |> List.iter(fun t -> t.Reply()) 是一个无操作,因此没有理由考虑缺失的 AsyncReplyChannel。取消可以轻松实现,因为 MailboxProcessorIDisposable,所以您可以使用 use slotAwaiter = semphore 100 - albertjan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接