Clojure core.async,有没有办法控制(go...)线程池中的线程数量?

6
默认情况下,(go..)会使用核数的两倍加42个线程作为线程池。是否有办法通过设置环境变量或其他方式来设置线程数或代码可以使用的CPU数量?
在Linux机器上,我可以使用taskset来设置CPU数量,例如: taskset -c 0,1 my_Java_or_Clojure_program, 尽管taskset似乎对(-> (java.lang.Runtime/getRuntime) .availableProcessors)返回的数字无效。

1
在 core.async 的代码中,它在哪里指定将使用“2 + 核心数量”?我查了一些资料,但没有找到。 - David J.
1
@David 从这里开始 http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ - Kevin Zhu
1
@David 从源代码来看,似乎是处理器数量的两倍加上42,https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj - Kevin Zhu
2个回答

18
当前被接受的答案在此提交之前是有效的:这个提交,所以现在基本上有两种情况:
  • 如果您只想更改池中线程的最大数量,则将该数字作为Java属性 clojure.core.async.pool-size 传递即可(默认为8)。

  • 如果您想替换ExecutorService,则使用相同的alter-var-root技巧,但指向新实现(有一个协议需要实现):

  •  (ns your-app.threadpool
       (:require [clojure.core.async.impl.protocols :as protocols]
                 [clojure.core.async.impl.concurrent :as conc]
                 [clojure.core.async.impl.exec.threadpool :as tp])
       (:import java.util.concurrent.Executors))
    
     (defonce my-executor
       (let [executor-svc (Executors/newFixedThreadPool
                           1
                           (conc/counted-thread-factory "async-dispatch-%d" true))]
         (reify protocols/Executor
            (protocols/exec [this r]
              (.execute executor-svc ^Runnable r)))))
    
     (alter-var-root #'clojure.core.async.impl.dispatch/executor
                     (constantly (delay my-executor)))
    

11

在当前的Clojure core.async版本中,线程池执行器位于clojure.core.async.impl.dispatch命名空间中。您可以更改executor变量并提供自定义线程池ExecutorService

(ns sandbox
  (:require [clojure.core.async.impl.concurrent :as conc]
            [clojure.core.async.impl.exec.threadpool :as tp]
            [clojure.core.async :as async]))

(defonce my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   1
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

(async/go
 (println 
  (Thread/currentThread))) ;=> #<Thread Thread[my-async-dispatch-1,5,main]>

注意:Core.async仍处于alpha版本,因此希望这将来会有所改变。


2
我只想补充一点,这只有在 core.async 调用创建函数之前修改变量才能起作用,因此这应该尽早在程序中被要求。 - Andrea Richiardi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接