F#如何运行多个异步任务并等待第一个完成的结果?

10

有许多关于如何在F#中执行异步任务的示例,例如:

[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously

但我该如何异步地等待只获取第一个结果呢?

例如,如果我想运行一些并行查找任务,并且在获取第一个成功结果时希望进行更深入的搜索。(基本上相当于C#世界中的 Task.WhenAny ...)


你是期望其他任务被中止,还是让它们在后台继续运行? - mavnn
我不想中止它们,但将来可能要将此方法扩展为类似于WaitForAnySuccessfullAndFaiIfAllFail的东西。我是f#的新手。在scala世界中,使用futures和promises很容易实现这一点。 - Aleksander Kois
2
值得注意的是,自从 F# 4.5 版本以来,这个功能已经内置为 Async.Choice - Tarmil
6个回答

7
我建议使用以下内容:

我会使用类似下面的代码:

let any asyncs =
    async {
        let t = 
            asyncs
            |> Seq.map Async.StartAsTask
            |> System.Threading.Tasks.Task.WhenAny
        return t.Result.Result }

1
这不会阻塞吗?将 WhenAny 的结果传递到 Async.AwaitTask 中,使用 let! t 然后返回 t.Result 如何? - nphx
@nphx:你能否将你的建议作为一个新答案添加或使用gist呢? - knocte

5
下面的代码片段提供了一个通用解决方案:http://fssnip.net/dN Async.Choice 可以像 Async.Parallel 一样嵌入到任何异步工作流中。可选的输出类型表示子计算可能在没有令人满意的结果的情况下完成。

我很惊讶Async中没有包含这样的东西。感谢提供链接。 - Leaf Garland
1
@LeafGarland 现在已经包含在这里:https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Choice - Goswin Rothenthal

3
我能想到的最简单的实现方式如下所示:
open FSharp.Control

let getOneOrOther () =
    let queue = BlockingQueueAgent(1)
    let async1 = async {
            do! Async.Sleep (System.Random().Next(1000, 2000))
            do! queue.AsyncAdd(1) } |> Async.Start
    let async2 = async {
            do! Async.Sleep (System.Random().Next(1000, 2000))
            do! queue.AsyncAdd(2) } |> Async.Start

    queue.Get()

for i in 1..10 do
    printfn "%d" <| getOneOrOther ()

Console.ReadLine () |> ignore

它依赖于来自FSharpx项目的阻塞队列实现,您可能因其他原因而需要此项目。但如果您不想依赖任何其他软件包,则System.Collections.Concurrent也包括一个阻塞队列,但接口略差一些。
对于带有取消功能的通用版本,下面的版本使用Seq<unit -> Async<'T>>,并返回第一个结果,并取消所有其他结果。
open FSharp.Control
open System.Threading

let async1 () = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 1 }
let async2 () = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 2 }

let getFirst asyncs =
    let queue = BlockingQueueAgent(1)
    let doWork operation = async { 
            let! result = operation()
            do! queue.AsyncAdd(result) }
    let start work =
        let cts = new CancellationTokenSource()
        Async.Start(work, cts.Token)
        cts
    let cancellationTokens =
        asyncs
        |> Seq.map doWork
        |> Seq.map start
    let result = queue.Get()
    cancellationTokens
    |> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
    result

for i in 1..10 do
    printfn "%A" <| getFirst [async1;async2]

Console.ReadLine () |> ignore

3

基于事件的另一种实现:

let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
    async {
        let e = Event<'T>()
        let cts = new System.Threading.CancellationTokenSource()
        do Async.Start(
            asyncs
            |> Seq.map (fun a -> async { let! x = a in e.Trigger x })
            |> Async.Parallel
            |> Async.Ignore,
            cts.Token)
        let! result = Async.AwaitEvent e.Publish
        cts.Cancel()
        return result
    }

1
似乎这个解决方案足够简单,非阻塞且适用于我的情况。
let any (list: Async<'T>[])=
    let tcs = new TaskCompletionSource<'T>()

    list |> Array.map (fun wf->Async.Start (async{
                let! res=wf
                tcs.TrySetResult (res) |> ignore
            }))
         |> ignore

    Async.AwaitTask tcs.Task

let async1 = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 1 }
let async2 = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 2 }

printfn "%d" <| ([|async1;async2|] |> any |> Async.RunSynchronously)

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接