Clojure核心异步库core.async,在超时后CPU卡死。有没有方法可以正确关闭(go..)块生成的宏线程?

7

基于core.async演示例子,我创建了下面类似的代码来处理一些使用多个通道进行的耗费CPU的工作,带有10秒的超时时间。然而,在主线程返回后,CPU使用率仍保持在大约700%(8个CPU的机器)。我必须手动在emacs中运行nrepl-close才能关闭Java进程。

是否有任何适当的方法来终止由(go..)块产生的宏线程?我尝试关闭每个通道,但它不起作用。我想确保主线程返回后Java进程的CPU使用率回到0。

(defn [] RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB (do...   (str ...)))


(let [n 1000
      cs (repeatedly n chan)]
  (doseq [c cs] 
    (go 
     (>! c  (RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB ))))

  (dotimes [i n]
    (let [[result source] (alts!!  (conj cs (timeout 10000))) ]  ;;wait for 10 seconds for each job
      (if  (list-contains? cs source)  ;;if returned chan belongs to cs 
        (prn "OK JOB FINISHED " result)
        (prn "JOB TIMEOUT")
        )))

 (doseq [i cs]
   (close! i))  ;;not useful for "killing" macro thread

 (prn "JOBS ARE DONE"))

;;Btw list-contains? function is used to judge whether an element is in a list
;;https://dev59.com/MHA75IYBdhLWcg3wg5fs
(defn list-contains? [coll value]
  (let [s (seq coll)]
    (if s
      (if (= (first s) value) true (recur (rest s) value))
      false)))

1
由于go块在固定的线程池中运行,不要将它们用于CPU密集型任务。但是,在调用CPU密集型函数之前(甚至在计算过程中),您可以检查通道是否已关闭。这与在常规基于线程的clojure/java中检查Thread/isInterrputed并没有太大区别。 - cgrand
你可以以多种方式终止go block。可以使用像(while @running ...)这样的原子,或者让该块在每次迭代时从另一个通道中获取数据,只要您希望go block继续运行即可。 - Leon Grapenthin
1
顺便说一下,我会使用一个集合来存储这些频道。这样可以避免使用 list-contains? 函数,并且只需要简单地使用 (if (cs source) ...) 即可。 - Leon Grapenthin
@cgrand,到目前为止,我发现它在CPU密集型任务方面运行良好,也许你的意思是它不适用于I/O阻塞任务?有文章称应避免在go块中使用I/O阻塞任务。http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ - Kevin Zhu
1
@KevinZhu,“intensive”指的是“非常长”。如果你启动了大量(大于线程池大小)的go块来计算pi的小数位数(愚蠢的例子),那么所有后续的go块都必须等待。这与阻塞I/O相同,只是更少见。 - cgrand
3个回答

2
在REPL中似乎还没有干净的方法。我最初尝试了一种非常不好的方式,使用已弃用的Thread.stop方法。
 (doseq [i @threadpool ]
              (.stop i))

似乎当主线程返回到REPL时,CPU使用率下降了,但如果我在REPL中再次运行程序,它就会在go block部分挂起!

然后我在谷歌上搜索并找到了这篇博客,它说:

最后需要注意的一点是:我们没有明确地做任何工作来关闭go routines。当主函数退出时,go routines会自动停止操作。因此,go routines就像JVM中的守护线程(好吧,除了“线程”部分……)

因此,我尝试将我的项目制作成一个uberjar,并在命令控制台上运行它,结果发现当闪烁的光标返回到控制台时,CPU使用率会立即下降!


1
(shutdown-agents)

针对实现,JVM:代理和通道都使用全局线程池,并且代理的终止函数会迭代并关闭VM中所有打开的线程。首先清空通道:这个操作是立即的且不可逆(特别是如果你在REPL中)。


1

根据另一个相关问题的答案如何控制(go...)中线程的数量?,我找到了一种更好的方法来正确终止(go...)块启动的所有线程:

首先改变executor变量并提供自定义线程池。

;; def, not defonce, so that the executor can be re-defined
;; Number of threads are fixed to be 4
(def my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   4
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

在 (go...) 块的末尾调用 .shutdownNow 和 .awaitTermination 方法。
(.shutdownNow my-executor)
(while (not  (.awaitTermination  my-executor 10 java.util.concurrent.TimeUnit/SECONDS ) )
       (prn "...waiting 10 secs for executor pool to finish") )

[更新] 上面的关闭执行器方法似乎不够纯粹。我这种情况的最终解决方案是将一个具有自己超时控制的函数发送到go块中,使用thunk-timeout函数。感谢this post。下面是示例

(defn toSendToGo [args timeoutUnits]
  (let [result (atom nil)  
        timeout? (atom false)]
    (try
      ( thunk-timeout
        (fn []  (reset! result  (myFunction args))) timeoutUnits)
      (catch  java.util.concurrent.TimeoutException e  (do  (prn "!Time out after " timeoutUnits " seconds!!") (reset! timeout? true))     ))

    (if @timeout?  (do sth))
    @result))


(let [c ( chan)]
  (go (>! c (toSendToGo args timeoutUnits))))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接