在内部 Lambda 中调用异步方法?“此结构只能在计算表达式中使用”

4
我有以下代码。
let rec consume() : Async<unit> = async {
    .....
    listA 
    |> Seq.iter(fun i ->
        .....
        let listB : seq<...> option = 
            let c = getListB a b
            match c with 
            | Some d -> Seq.filter(....) |> Some
            | None -> None 
        match listB with .....
    ....

现在函数getListB已经转换为返回async<Seq<B>>而不是Seq<B>。所以代码被转换成以下形式。然而,getListB会阻塞执行。如何将其重写为非阻塞方式?仅仅将该行代码转换为let! c = getListB a b并不能解决问题,因为这段代码在内部lambda中。错误信息是“此结构只能在计算表达式中使用”。
let rec consume() : Async<unit> = async {
    .....
    listA 
    |> Seq.iter(fun i ->
        .....
        let listB : seq<...> option = 
            let c = getListB a b |> Async.RunSynchronously
            match c with 
            | Some d -> Seq.filter(....) |> Some
            | None -> None 
2个回答

2
答案取决于您想要顺序或并行地运行序列中的每个元素。在这两种情况下,首先使用Seq.map而不是Seq.iter,然后您可以在lambda内部放置另一个async块,使得map的结果为seq<Async<'a>>。
顺序执行:
对于此操作,您需要在额外的Async模块中定义一些额外的函数。
module Async = 
    let map f x =
        async{
            let! x = x
            return f x
        }

    let lift2 f x1 x2 = 
        async{
            let! x1 = x1
            let! x2 = x2
            return f x1 x2
        }

    let return' x = async { return x }

    let mapM mFunc sequ =
        let consF x ys = lift2 (fun h t -> h::t) (mFunc x) ys
        Seq.foldBack(consF) sequ (return' [])
        |> map (Seq.ofList)

    let sequence sequ = mapM id sequ

你可能已经看到过 mapM 在其他地方被称为 traverse,它们基本上是同一个概念的不同名称。 sequence 函数只是 mapM 的一种特殊情况,其中提供的绑定函数只是恒等函数 (id)。它的类型是 seq<Async<'a>> -> Async<seq<'a>>,即将 AsyncSeq 内部翻转到外部。
然后,你只需要将 Seq.map 的结果传送到 sequence 函数中,就可以得到一个 async 值。
由于你的示例代码不完整,因此我编写了一些示例代码来演示如何使用它:
let sleep = Async.Sleep 100
let sleeps = Seq.init 15 (fun _ -> sleep)
let sequencedSleeps = Async.sequence sleeps
Async.RunSynchronously sequencedSleeps
Real: 00:00:01.632, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : seq<unit> =
  [null; null; null; null; null; null; null; null; null; null; null; null;
   null; null; null]

并行处理

如果要并行处理序列中的每个元素,而不是按顺序执行,可以采用以下方法:

let pSequence sequ = Async.Parallel sequ |> Async.map (Seq.ofArray)

示例测试代码:

let pSleeps = pSequence sleeps;;
Async.RunSynchronously pSleeps;;
Real: 00:00:00.104, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : seq<unit> = seq [null; null; null; null; ...]

注意执行时间取决于所选择的方法。


对于返回seq<unit>并且想忽略结果的情况,定义一些额外的辅助函数可能很有用,例如:

let sequenceIgnore sequ = sequ |> Async.sequence |> Async.map (ignore)

let pSequenceIgnore sequ = sequ |> pSequence |> Async.map (ignore)

这样你就可以返回单个unit而不是多余的序列。


2
我认为你所描述的问题归结为如何将seq<Async>转换为Async<seq>。Scott Wlaschin在这篇文章中对此进行了全面描述。这是他所描述的概念的一个简单实现,而这些概念更加强大和通用。总体思路是,我们希望延迟序列的创建,直到我们拥有Async<_>实例所承诺的值。
let traverseSequence ( seqAsync : seq<Async<'a>>) = 

    let promiseOfAnEmptySequence = async  { return Seq.empty }

    let delayedCalculation (asyncHead : Async<'a>) (asyncTail : Async<seq<'a>>) = 
        async {
        let! calculatedHead = asyncHead
        return!
            async {
                let! calculatedTail = asyncTail
                return calculatedHead |> Seq.singleton |> Seq.append(calculatedTail)
            }
        }

    Seq.foldBack delayedCalculation seqAsync promiseOfAnEmptySequence

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接