有许多关于如何在F#中执行异步任务的示例,例如:
[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously
但我该如何异步地等待只获取第一个结果呢?
例如,如果我想运行一些并行查找任务,并且在获取第一个成功结果时希望进行更深入的搜索。(基本上相当于C#世界中的 Task.WhenAny
...)
有许多关于如何在F#中执行异步任务的示例,例如:
[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously
但我该如何异步地等待只获取第一个结果呢?
例如,如果我想运行一些并行查找任务,并且在获取第一个成功结果时希望进行更深入的搜索。(基本上相当于C#世界中的 Task.WhenAny
...)
我会使用类似下面的代码:
let any asyncs =
async {
let t =
asyncs
|> Seq.map Async.StartAsTask
|> System.Threading.Tasks.Task.WhenAny
return t.Result.Result }
WhenAny
的结果传递到 Async.AwaitTask
中,使用 let! t
然后返回 t.Result
如何? - nphxAsync.Choice
可以像 Async.Parallel
一样嵌入到任何异步工作流中。可选的输出类型表示子计算可能在没有令人满意的结果的情况下完成。open FSharp.Control
let getOneOrOther () =
let queue = BlockingQueueAgent(1)
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(1) } |> Async.Start
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(2) } |> Async.Start
queue.Get()
for i in 1..10 do
printfn "%d" <| getOneOrOther ()
Console.ReadLine () |> ignore
System.Collections.Concurrent
也包括一个阻塞队列,但接口略差一些。Seq<unit -> Async<'T>>
,并返回第一个结果,并取消所有其他结果。open FSharp.Control
open System.Threading
let async1 () = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2 () = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
let getFirst asyncs =
let queue = BlockingQueueAgent(1)
let doWork operation = async {
let! result = operation()
do! queue.AsyncAdd(result) }
let start work =
let cts = new CancellationTokenSource()
Async.Start(work, cts.Token)
cts
let cancellationTokens =
asyncs
|> Seq.map doWork
|> Seq.map start
let result = queue.Get()
cancellationTokens
|> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
result
for i in 1..10 do
printfn "%A" <| getFirst [async1;async2]
Console.ReadLine () |> ignore
基于事件的另一种实现:
let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
async {
let e = Event<'T>()
let cts = new System.Threading.CancellationTokenSource()
do Async.Start(
asyncs
|> Seq.map (fun a -> async { let! x = a in e.Trigger x })
|> Async.Parallel
|> Async.Ignore,
cts.Token)
let! result = Async.AwaitEvent e.Publish
cts.Cancel()
return result
}
let any (list: Async<'T>[])=
let tcs = new TaskCompletionSource<'T>()
list |> Array.map (fun wf->Async.Start (async{
let! res=wf
tcs.TrySetResult (res) |> ignore
}))
|> ignore
Async.AwaitTask tcs.Task
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
printfn "%d" <| ([|async1;async2|] |> any |> Async.RunSynchronously)
Async.Choice
。
https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Choice
Async.Choice
。 - Tarmil