如何停止OCaml垃圾回收我的响应式事件处理程序?

9
我想使用OBus库和Lwt_react一起编程。这个库使用函数响应编程处理属性和信号。
问题在于OCaml可能会在你仍在使用回调函数时进行垃圾收集,这一点在React文档中也有提到。有一个keep函数可以永久保留处理程序,但我不想这样做。我确实希望最终能释放它,只是在我仍需要它的时候不要释放。
所以,我想把处理程序附加到一个开关上。
let keep ~switch handler =
  Lwt_switch.add_hook (Some switch) (fun () ->
    ignore handler;
    Lwt.return ()
  )

但是我的事件处理程序最终还是被垃圾回收了(这很合理,因为关闭开关的代码在信号到达时调用,所以只有信号处理程序才能使开关保持活动状态)。

这是我代码的简化版本:

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let setup () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let dont_gc_me = Lwt_react.E.map handler finished_event in
  ignore dont_gc_me;  (* What goes here? *)

  print_endline "Waiting for signal...";
  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  let finished = Lwt.protected (setup ()) in

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt_main.run finished;
  print_endline "Done";

没有 Gc.full_major 这一行,通常会打印出 Done。但是加上这一行后,程序就会卡在 Waiting for signal... 上。
编辑:我已经将真正的代码从测试驱动程序中拆分出来,并添加了一个 Lwt.protected 封装,以避免 Lwt 取消操作的意外掩盖问题。
4个回答

7
这里是我某个项目中的片段,已经修复了弱引用问题(谢谢!)。 第一部分是保持全局根指向您的对象。 第二部分是将信号/事件的生命周期限制在Lwt线程的范围内。
请注意,响应实体被克隆并明确停止,这可能不完全符合您的期望。
module Keep : sig 
  type t
  val this : 'a -> t
  val release : t -> unit
end = struct
  type t = {mutable prev: t; mutable next: t; mutable keep: (unit -> unit)}
  let rec root = {next = root; prev = root; keep = ignore}

  let release item =
    item.next.prev <- item.prev;
    item.prev.next <- item.next;
    item.prev <- item;
    item.next <- item;
    (* In case user-code keep a reference to item *)
    item.keep <- ignore

  let attach keep =
    let item = {next = root.next; prev = root; keep} in
    root.next.prev <- item;
    root.next <- item;
    item

  let this a = attach (fun () -> ignore a)
end

module React_utils : sig
  val with_signal : 'a signal -> ('a signal -> 'b Lwt.t) -> 'b Lwt.t
  val with_event  : 'a event -> ('a event -> 'b Lwt.t) -> 'b Lwt.t
end = struct
  let with_signal s f =
    let clone = S.map (fun x -> x) s in
    let kept = Keep.this clone in
    Lwt.finalize (fun () -> f clone)
                 (fun () -> S.stop clone; Keep.release kept; Lwt.return_unit)
  let with_event e f =
    let clone = E.map (fun x -> x) e in
    let kept = Keep.this clone in
    Lwt.finalize (fun () -> f clone)
                 (fun () -> E.stop clone; Keep.release kept; Lwt.return_unit)
end

用以下方法解决您的问题:
let run () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  (* We use [Lwt.async] because are not interested in knowing when exactly the reference will be released *)
  Lwt.async (fun () ->
    (React_utils.with_event (Lwt_react.E.map handler finished_event)
      (fun _dont_gc_me -> finished)));
  print_endline "Waiting for signal...";

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

我认为这解决的是相反的问题。你想在函数完成后停止接收事件,而我想在完成前继续接收事件。我现在已经更新了问题,并提供了一些示例代码来演示问题。我无法看到使用你的模块来修复它的方法。谢谢。 - Thomas Leonard
我不同意。我希望在线程完成之前持续接收事件,并在完成后停止接收。根据我的理解,第一部分正是你所追求的目标。 - Def
好的,它确实起作用了!但原因非常不明显。最终我通过查看Lwt源代码找到了答案。Lwt.bind调用会从最终任务返回到“finished”创建一个额外的引用,因为它是一个“可取消”的任务(取消“run”的结果将取消“finished”)。虽然我担心这有点脆弱。例如,在Gc行之前添加let finished = Lwt.protected finished in会再次破坏它。 - Thomas Leonard
你说得对,它确实无法工作,但一定有一种方法可以修复它...为什么一开始似乎能够工作对我来说很显然:lwt线程保留了对反应图的引用,因此在由Reactive_utils创建的lwt线程完成之前,反应图不能被垃圾回收。 - Def
2
为什么它不起作用是有趣和微妙的,我认为这是因为所有对finished的引用都很弱: "waker" 在React事件中被引用,它是弱引用。 "finished" 被局部范围引用,但在Lwt.protected调用之后,原始的finished无法从局部范围访问。由于对完成线程的两个可能引用都很弱,它们被垃圾回收,以及React_utils在其中执行的所有管道。我们需要的是一种使线程“强”的方法,这意味着保持对它的强引用,直到确定它。 - Def

1

这是我目前的(hacky)解决方案。每个处理程序都添加到全局哈希表中,然后在关闭开关时再次删除:

let keep =
  let kept = Hashtbl.create 10 in
  let next = ref 0 in
  fun ~switch value ->
    let ticket = !next in
    incr next;
    Hashtbl.add kept ticket value;
    Lwt_switch.add_hook (Some switch) (fun () ->
      Hashtbl.remove kept ticket;
      Lwt.return ()
    )

它的使用方式如下:
Lwt_react.E.map handler event |> keep ~switch;

1

处理这个问题的一种简单方法是保留对事件的引用,当你不再需要它时调用React.E.stop

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let run () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let ev = Lwt_react.E.map handler finished_event in
  print_endline "Waiting for signal...";

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  React.E.stop ev;

  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  Lwt_main.run (run ());
  print_endline "Done";

这是我的测试用例不好的问题(将真实代码与测试驱动程序代码混合)。我已经用更好的版本替换了它。在真实代码中,你不能在fire_finished之后立即调用stop,因为fire_finished发生在事件源而不是接收器。如果你把stop放在bind函数里面,你就会遇到和@Def答案一样的问题;它可能被垃圾回收。 - Thomas Leonard
我明白了。那么这是预料中的。 - dim

0
请注意,如果lwt不支持取消,则通过将Lwt.protected (setup ())替换为Lwt.bind (setup ()) Lwt.return会观察到相同的行为。
基本上你所拥有的是: finished_event --weak--> SETUP --> finished 其中SETUP是事件和Lwt线程之间的循环。移除Lwt.protected只是“压缩”了最后一个指针,所以它恰好做了你想要的事情。
Lwt只有前向指针(除了支持取消),而React只有后向指针(前向指针是弱引用)。因此,使其正常工作的方法是返回事件而不是线程。

好的,但是如果我返回一个事件,那么我就不能将其传递给Lwt_main.run或任何需要任务的其他东西,对吗?(这一切都发生在更大程序的小子例程中 - 具体来说,是0install执行PackageKit事务) - Thomas Leonard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接