Clojure核心异步库用于数据计算

5
我已经开始使用Clojure core.async库。我发现CSP、通道和go块的概念非常容易使用。但是,我不确定自己是否使用得正确。下面是我的代码 -
(def x-ch (chan))
(def y-ch (chan))
(def w1-ch (chan))
(def w2-ch (chan))

; they all return matrices
(go (>! x-ch (Mat/* x (map #(/ 1.0 %) (max-fold x)))))
(go (>! y-ch (Mat/* y (map #(/ 1.0 %) (max-fold y)))))
(go (>! w1-ch (gen-matrix 200 300)))
(go (>! w2-ch (gen-matrix 300 100)))

(let [x1 (<!! (go (<! x-ch)))
        y1 (<!! (go (<! y-ch)))
        w1 (<!! (go (<! w1-ch)))
        w2 (<!! (go (<! w2-ch)))]

    ;; do stuff w/ x1 y1 w1 w2
)

我有预定义的(矩阵)向量,名称为符号“x”和“y”。在使用它们之前,我需要修改这两个向量。这两个向量都很大。我还需要生成两个随机矩阵。由于“go”宏启动计算是异步的,因此我将所有四个计算任务拆分成单独的go块,并将结果放入通道中。然后我使用let块从通道中获取值并将它们存储到符号中。它们都使用阻塞的“<!!”“take”函数,因为它们在主线程上执行。
基本上,我试图通过将程序片段拆分成异步进程来加速计算时间。请问这样做的方式正确吗?

2
你为什么想在这里使用异步代码?当你需要阻塞和等待时,异步代码非常有效。否则它什么也做不了。当你只需要进行数学计算时,应该直接或并行计算,例如在未来。 - JustAnotherCurious
谢谢。在我的代码中,我同时启动了4个计算,并阻塞/等待值直到它们返回结果。这与异步块/等待有何不同?但是,正如Nicolas和您指出的那样,我将使用future来实现。我不能在go块内使用core.reducers吗?或者那会是一个可怕的想法? - Lordking
你不会同时启动它们。一切都是按顺序执行的,因为在go块中没有有价值的阻塞操作。我定义了函数“prime?”来测试数字i是否为质数。看这里:”(time (do (count (filter true? (map prime? (range 2 20000))))))“花费3秒钟,而”(time (count (filter true? (map prime? (range 2 20000)))))“也花费3秒钟。你只使用了一个线程,在这里一切都是按顺序进行的。试试吧! - JustAnotherCurious
谢谢。我已经将我的代码切换为使用future。 - Lordking
3个回答

5

对于这种处理方式,future 可能更加适合。

链接中的例子很容易理解:

 (def f 
   (future 
     (Thread/sleep 10000) 
     (println "done") 
     100))

处理过程会立即启动未来块,因此上面的代码确实启动了一个线程,等待10秒后完成并打印"done"。

当你需要该值时,只需使用:

(deref f)
; or @f

这将会阻塞并返回未来代码块的值。

在同一个例子中,如果你在10秒钟内调用deref,调用将会被阻塞直到计算完成。

在你的例子中,由于你只是等待计算完成,并不太关心通道参与者之间的消息和交互,我建议使用future。因此:

 (future 
    (Mat/* x (map #(/ 1.0 %) (max-fold x))))

谢谢Nicolas。我会尝试一下。不过,如果我必须使用core.async,你对我的实现有什么想法呢?我正在努力理解CSP。 - Lordking

3

go块返回一个通道,其中包含表达式的结果,因此您不需要为其结果创建中间通道。下面的代码让您可以同时启动所有4个计算,然后阻塞值直到它们返回。如果您不需要立即使用某些结果,则只有在实际使用它时才能阻止该值。

(let [x1-ch (go (Mat/* x (map #(/ 1.0 %) (max-fold x))))
      y1-ch (go (Mat/* y (map #(/ 1.0 %) (max-fold y))))
      w1-ch (go (gen-matrix 200 300))
      w2-ch (go (gen-matrix 300 100))
      x1 (<!! x1-ch)
      y1 (<!! y1-ch)
      w1 (<!! w1-ch)
      w2 (<!! w2-ch)]
  ;; do stuff w/ x1 y1 w1 w2
  )

1
如果你想通过并行运行代码来加速程序的运行,那么你可以考虑使用Clojure的Reducers或Aphyr的Tesser。它们的工作原理是将单个计算中的工作分成可并行化的部分,然后将它们组合在一起。这将高效地在计算机上的所有核心上运行工作。如果你使用future或go块运行每个计算,则每个计算将在单个线程上运行,有些可能会比其他计算更快完成,这些核心将处于空闲状态。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接