等待异步工作流程的取消。

4
< p > CancellationTokenSource 对象的“取消”成员 “传达了取消请求”,这意味着它是“点火并忘记”的,不会等待取消完成(例如所有异常处理程序都已运行)。虽然很好,但我需要在创建另一个异步操作之前等待未完成的异步操作完全被取消。有没有简单的方法来实现这个?

2个回答

4
我认为在F#异步库的标准库函数中没有直接的方法来实现这一点。最接近的操作是使用Async.TryCancelled,它会在工作流程被(实际)取消时运行回调函数,但是从回调函数发送消息到启动工作流的代码必须手动完成。
使用事件和我编写的 F# 异步扩展中包含的一个扩展程序(也包括在 FSharpX 包中)可以相对容易地解决这个问题——该扩展程序是GuardedAwaitObservable,可用于等待事件的发生(该事件可以由某些操作立即触发)。
下面的Async.StartCancellable方法接受一个异步工作流并返回Async<Async<unit>>。当您在外部工作流上绑定时,它会启动参数(如Async.StartChild),当您在返回的内部工作流上绑定时,它会取消计算并等待直到它实际被取消:
open System.Threading

module Async = 
  /// Returns an asynchronous workflow 'Async<Async<unit>>'. When called
  /// using 'let!', it starts the workflow provided as an argument and returns
  /// a token that can be used to cancel the started work - this is an
  /// (asynchronously) blocking operation that waits until the workflow
  /// is actually cancelled 
  let StartCancellable work = async {
    let cts = new CancellationTokenSource()
    // Creates an event used for notification
    let evt = new Event<_>()
    // Wrap the workflow with TryCancelled and notify when cancelled
    Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token)
    // Return a workflow that waits for 'evt' and triggers 'Cancel'
    // after it attaches the event handler (to avoid missing event occurrence)
    let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel
    return async.TryFinally(waitForCancel, cts.Dispose) }

编辑:按照Jon的建议,将结果包装在TryFinally中以处理CancellationTokenSource的释放。我认为这应该足以确保正确地进行了释放。

下面是一个使用该方法的示例。 loop函数是我用于测试的简单工作流程。 其余的代码启动它,等待5.5秒然后取消它:

/// Sample workflow that repeatedly starts and stops long running operation
let loop = async {
  for i in 0 .. 9999 do
    printfn "Starting: %d" i
    do! Async.Sleep(1000)
    printfn "Done: %d" i }

// Start the 'loop' workflow, wait for 5.5 seconds and then
// cancel it and wait until it finishes current operation  
async { let! cancelToken = Async.StartCancellable(loop)
        printfn "started"
        do! Async.Sleep(5500)
        printfn "cancelling"
        do! cancelToken
        printfn "done" }
|> Async.Start

为了完整起见,来自FSharpX的必要定义示例可以在F#片段上找到。


1
你应该处理 CancellationTokenSource 吗? - J D
我认为这很重要。我曾经写过一篇关于FSharp.Core中泄漏问题的文章,我相信这个问题是由于没有处理CTS而导致的:http://t0yv0.blogspot.com/2011/12/solving-f-asyncstartchild-leak-futures.html - t0yv0
@JonHarrop 这是一个很好的观点。我不确定在这种情况下是否会导致泄漏,但调用Dispose肯定更好。我编辑了答案,在计算被取消(并且取消已完成)后,在终结器中调用Dispose - Tomas Petricek
@TomasPetricek 我认为如果你在循环中调用代码,例如在服务器中一遍又一遍地调用StartCancellable,可能会发生泄漏。通过Dispose,应该没问题了,感谢纠正。 - t0yv0
@TomasPetricek 你的代码中有一个可能不太直观的方面,即如果工作代码在取消发生之前正常完成或出现异常,则等待取消将无限期地等待。这就是为什么我更喜欢我的答案,尽管它可能与你的答案在其他方面相同的原因。 - t0yv0
@toyvo 那是一个好观点 - 你可以很容易地扩展我的实现,以便在计算完成时触发事件(只需在 Async.TryCancelled 之外再使用 async.TryFinally 包装 work)。 - Tomas Petricek

4

使用易于使用的同步原语,这应该不难。我特别喜欢只写一次的“逻辑”变量:

type Logic<'T> =
    new : unit -> Logic<'T>
    member Set : 'T -> unit
    member Await : Async<'T>

将 Async 包装到逻辑变量上以在完成后设置它,然后等待它非常容易,例如:

type IWork =
    abstract member Cancel : unit -> Async<unit>

let startWork (work: Async<unit>) =
    let v = Logic<unit>()
    let s = new CancellationTokenSource()
    let main = async.TryFinally(work, fun () -> s.Dispose(); v.Set())
    Async.Start(main, s.Token)
    {
        new IWork with
            member this.Cancel() = s.Cancel(); v.Await
    }

逻辑变量的一个可能实现如下:

type LogicState<'T> =
    | New
    | Value of 'T
    | Waiting of ('T -> unit)

[<Sealed>]
type Logic<'T>() =
    let lockRoot = obj ()
    let mutable st = New
    let update up =
        let k =
            lock lockRoot <| fun () ->
                let (n, k) = up st
                st <- n
                k
        k ()

    let wait (k: 'T -> unit) =
        update <| function
            | New -> (Waiting k, ignore)
            | Value value as st -> (st, fun () -> k value)
            | Waiting f -> (Waiting (fun x -> f x; k x), ignore)

    let await =
        Async.FromContinuations(fun (ok, _, _) -> wait ok)

    member this.Set<'T>(value: 'T) =
        update <| function
            | New -> (Value value, ignore)
            | Value _ as st -> (st, ignore)
            | Waiting f as st -> (Value value, fun () -> f value)

    member this.Await = await

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接