如何最佳关闭一个Clojure core.async进程管道

9
我有一个Clojure处理应用程序,它是一系列通道的管道。每个处理步骤都是异步进行计算的(即使用http-kit或其他方式进行http请求),并将其结果放在输出通道上。这样下一步就可以从该通道读取并进行计算。
我的主要函数如下:
(defn -main [args]
 (-> file/tmp-dir
  (schedule/scheduler)
  (search/searcher)
  (process/resultprocessor)
  (buy/buyer)
  (report/reporter)))

目前,调度器步骤驱动管道(它没有输入通道)并为链条提供工作负载。

当我在 REPL 中运行此命令时:

(-main "some args")

基于调度程序的无限循环,使其可以永远运行。我需要改变这种架构,以便可以通过 REPL 关闭整个系统。关闭每个通道是否意味着系统终止?
广播通道是否有所帮助?

不幸的是,这也会终止REPL。我将尝试使用组件方法。 - Marten Sytema
3个回答

7
您可以将您的调度程序 alts! / alts!! 放置在 kill 通道和管道的输入通道上:
(def kill-channel (async/chan))

(defn scheduler [input output-ch kill-ch]
  (loop []
    (let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
                  :priority true)]
      (if-not (= p kill-ch)
        (recur))))

kill-channel设置为某个值将终止循环。从技术上讲,您还可以使用output-ch来控制流程(关闭的通道的输出将返回false),但我通常认为显式的kill channel更加清晰,至少对于最高层的管道而言。
为了使事情同时更加优雅且更加方便使用(无论是在REPL还是在生产中),您可以使用Stuart Sierra的component,在组件的start方法中启动调度程序循环(在单独的线程上)并将kill channel附加到组件上,并在组件的stop方法中关闭kill channel(从而终止循环)。

5
我建议您使用类似https://github.com/stuartsierra/component的东西来处理系统设置。它确保您可以在REPL中轻松启动和停止系统。使用该库,您将设置每个处理步骤为组件,并且每个组件都将在其startstop协议中处理通道的设置和撤销。此外,您可能会为每个组件创建一个IStream协议并使每个组件依赖于实现该协议的组件。它为您提供了一些非常简单的模块化。
您最终将得到以下系统:
(component/system-map
 :scheduler (schedule/new-scheduler file/tmp-dir)
 :searcher  (component/using (search/searcher)
                             {:in :scheduler})
 :processor (component/using (process/resultprocessor)
                             {:in :searcher})
 :buyer     (component/using (buy/buyer)
                             {:in :processor})
 :report    (component/using (report/reporter)
                             {:in :buyer}))

这种方法的好处之一是如果其他组件也依赖于通道,您可以轻松地添加这些组件。例如,如果每个组件都使用内部的mult上的tap创建自己的通道,那么您只需添加一个以处理器为依赖项的记录器组件即可。

 :processor (component/using (process/resultprocessor)
                             {:in :searcher})
 :processor-logger (component/using (log/logger)
                                    {:in processor})

我建议您观看他的演讲,以便更好地了解它的工作原理。 链接

1
你应该考虑使用Stuart Sierra的重新加载工作流程,它依赖于将你的“管道”元素建模为组件,这样你就可以将逻辑单例建模为“类”,这意味着你可以控制每个逻辑单例的构造和销毁(启动/停止)逻辑。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接