F# 异步工作流带有超时功能

10

我非常喜欢F#的 async 机制,但是对我来说,它有一个严重的问题:它不允许创建工作流,这些工作流的执行时间不能超过某个特定的时间间隔。

为了更清楚地说明,这里有一个我为自己编写的简单函数:

let withTimeout operation timeout = async {
    try
        return Some <| Async.RunSynchronously (operation, timeout)
    with :? TimeoutException -> return None
}

即签名为

val withTimeout : operation:Async<'a> -> timeout:int -> Async<'a option>

这里是示例用法:

let op = async { 
    do! Async.Sleep(1000) 
    return 1
}
#time
withTimeout op 2000 |> Async.RunSynchronously;;
// Real: 00:00:01.116, CPU: 00:00:00.015, GC gen0: 0, gen1: 0, gen2: 0
// val it : unit option = Some 1
withTimeout op 2000 |> Async.RunSynchronously;;
// Real: 00:00:01.004, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
// val it : unit option = Some 1
withTimeout op 500 |> Async.RunSynchronously;;
// Real: 00:00:00.569, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
// val it : unit option = None    

你可以看到,它按预期工作。它非常好,但也有点尴尬,我不确定它的安全性和可能出现的其他问题。也许我正在重新发明轮子,也许有一种好的、简洁的方法来编写这样的工作流程?


1
这种方法的问题在于超时后工作流程将继续执行。相反,你应该考虑使用取消操作,并在超时时间段之后发出取消令牌的信号。然而,这要求你的工作流程阶段能够响应取消通知。 - Lee
当你使用Async.RunSynchronously时,如果底层异步操作是I/O绑定的,你会阻塞当前线程并且失去效率。我认为我还没有看到过一个在实际中工作的实现,但我记得使用了一个丑陋的hack来将异步组件转换为可观察对象,然后合并它们。 - MisterMetaphor
@MisterMetaphor这就是为什么我将RunSynchronously封装到另一个async中。我猜这样可以保持并发性。但是,我同意这很丑陋。我更关心@Lee说的-我实际上并没有杀死正在运行的任务。但是目前我不知道如何修复它。 - Rustam
3个回答

11

更新: 目前最好的选择是由Vesa A.J.K在这里提出的:https://stackoverflow.com/a/26230245/1554463。 加上我的编辑后就像这样:

type Async with
    static member WithTimeout (timeout : int option) operation = 
        match timeout with
        | Some time  when time > 0 -> 
            async { 
                let! child = Async.StartChild (operation, time) 
                try 
                    let! result = child 
                    return Some result
                with :? TimeoutException -> return None 
            }
        | _ -> 
            async { 
                let! result = operation
                return Some result
            }

这里还有另外一个选项:

type Async with
    static member WithCancellation (token:CancellationToken) operation = 
        async {
            try
                let task = Async.StartAsTask (operation, cancellationToken = token)
                task.Wait ()
                return Some task.Result
            with 
                | :? TaskCanceledException -> return None
                | :? AggregateException -> return None
        }

    static member WithTimeout (timeout:int option) operation = 
        match timeout with
        | Some(time) -> 
            async {
                use tokenSource = new CancellationTokenSource (time)
                return! operation |> Async.WithCancellation tokenSource.Token
            }

        | _ -> 
            async { 
                let! res = operation
                return Some res
            }

我在这里使用 .Net 任务和CancellationToken


9
只需使用Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>:
let with_timeout timeout action =
  async {
    let! child = Async.StartChild( action, timeout )
    return! child
  }

7
请看此处的Async.WhenAny实现(链接)。通过使用它,您可以类似于Task的实现方法来实现withTimeout,具体方法请参见此处。请注意,标签不应被更改。
let withTimeout dueTime comp =
    let success = async {
        let! x = comp
        return (Some x)
    }
    let timeout = async {
        do! Async.Delay(dueTime)
        return None
    }
    Async.WhenAny(success, timeout)

我不确定在第一个计算完成时,它是否会取消其他计算,但至少此实现不会不必要地阻塞线程。

+1 感谢您的考虑。我会思考如何扩展它以便处理取消。 - Rustam
只有这个解决方案适用于我 - 在 Xamarin 上测试过。 - Nghia Bui

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接