有没有Clojure惯用语可以并行调度多个表达式?

5
我有一系列表达式,储存在一个向量中 [expr1 expr2 expr3 ...] (但这些表达式尚未求值)。
我希望把每个表达式分别交给不同的线程,并等待其中之一返回一个值。此时,我对于其他线程的结果不感兴趣,想要取消它们以节省 CPU 资源。
(我意识到这可能会导致非确定性,即程序的不同运行可能会导致不同的表达式首先被求值。我已经解决了这个问题。)
是否有一种标准/惯用的方法来实现上述目标?
5个回答

5

以下是我的理解:

基本上,您需要在每个future中解析全局promise,然后返回包含future列表和已解析值的向量,最后取消列表中的所有futures:

(defn run-and-cancel [& expr]
    (let [p (promise)
          run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p])
          [fs res] (apply run-futures expr)]
        (map future-cancel fs)
        res))

谢谢soulcheck(以及所有做出贡献的人)。我逐渐朝着与上面相同的解决方案前进,但我的代码不够简洁或优雅;我是这门语言的新手,需要一些练习;) - Sean Holdsworth

2
它还没有正式发布,但core.async看起来可能是解决您的问题以及其他异步问题的一个有趣的方式。

core.async的Leiningen命令(当前)如下:

[org.clojure/core.async "0.1.0-SNAPSHOT"]

这里有一些代码,可以创建一个函数,它会接收一些耗时的函数,并且会阻塞直到其中一个函数返回。
(require '[clojure.core.async :refer [>!! chan alts!! thread]]))     

(defn return-first [& ops]
  (let [v (map vector ops (repeatedly chan))]
    (doseq [[op c] v]
      (thread (>!! c (op))))
    (let [[value channel] (alts!! (map second v))]
         value)))

;; Make sure the function returns what we expect with a simple Thread/sleep
(assert (= (return-first (fn [] (Thread/sleep 3000) 3000)
                         (fn [] (Thread/sleep 2000) 2000)
                         (fn [] (Thread/sleep 5000) 5000))
            2000))

在上面的示例中:
  • chan 创建一个异步通道
  • >!! 将一个值放入通道
  • thread 在另一个线程中执行主体
  • alts!! 接受一个通道向量,并在任何一个通道中出现值时返回

这只是其中一部分,我仍在努力理解它,但这里有一个演练:https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

David Nolen 的博客也有一些很棒的、令人费解的帖子(http://swannodette.github.io/)。

编辑

刚刚看到Michał Marczyk在这里回答了一个非常类似的问题,而且回答得更好,而且它允许您取消/短路。 使用Clojure线程处理长时间运行的进程并比较它们的结果

1
你想要的是Java的CompletionService。我不知道有没有在Clojure中对此进行封装的方法,但使用互操作并不难实现。下面的示例基于ExecutorCompletionService的JavaDoc页面上的示例。
(defn f [col] 
    (let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool))
          futures (map #(.submit cs %) col)
          result (.get (.take cs))]
        (map #(.cancel % true) futures)
        result))

使用ExecutorCompletionService是一个好建议。另一方面,你的代码不会起作用,因为它忽略了map的惰性:没有任务会被提交,而.take调用将永远阻塞。 - Daniel Dinnyes
谢谢你关于 map 的输入;我之前没有注意到。我想我可以把 map 调用包装在 doall 中吗? - Kevin
确切地说,您必须将它们包装在“doall”中。在取消调用的情况下,使用“doseq”会更加惯用。 - Daniel Dinnyes

0

您可以使用future-call获取所有future的列表,并将它们存储在Atom中。然后,将每个正在运行的future与一个“shoot the other ones in the head”函数组合起来,以便第一个future可以终止剩余的所有future。 这里有一个示例:

(defn first-out [& fns]
  (let [fs (atom [])
        terminate (fn [] (println "cancling..") (doall (map future-cancel @fs)))]
    (reset! fs (doall (map (fn [x] (future-call #((x) (terminate)))) fns)))))

(defn wait-for [n s]
  (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush)))

(first-out (wait-for 1000 "long") (wait-for 500 "short"))

编辑

刚刚注意到之前的代码没有返回第一个结果,因此它主要用于副作用。这里有另一个版本,它使用 Promise 返回第一个结果:

(defn first-out [& fns]
  (let [fs (atom [])
        ret (promise)
        terminate (fn [x] (println "cancling.." ) 
                          (doall (map future-cancel @fs))
                          (deliver ret x))]
    (reset! fs (doall (map (fn [x] (future-call #(terminate (x)))) fns)))
    @ret))

(defn wait-for [n s]
  "this time, return the value"
  (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush) s))

(first-out (wait-for 1000 "long") (wait-for 500 "short"))

-1

虽然我不知道是否有一种惯用的方法来实现你的目标,但Clojure Future看起来很合适。

接受一组表达式并生成一个future对象,该对象将在另一个线程中调用该组表达式,并将缓存结果并在所有后续调用deref/@时返回它。如果计算尚未完成,则调用deref/@将阻塞,除非使用带有超时的deref变体。


你如何避免等待所有的 futures 完成? - soulcheck
是的,我正在考虑使用futures,但我看不到避免不必要阻塞的方法。假设expr1需要10秒来评估,expr2需要1秒来评估,而expr3需要100秒。在一般情况下,我如何知道我应该调用deref/@来评估expr2,因为我没有关于可能的评估时间的先验信息;否则,我可以在线程中评估expr2并完成它 ;) - Sean Holdsworth
@SeanHoldsworth 通过使用(实现?)也许可以实现。 - Chiron
@Chiron那不是意味着某种形式的忙等待吗? - soulcheck

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接