Clojure中死简单的Fork-Join并发

6

我有两个昂贵的独立函数。我希望并行运行它们。我不想处理 futures 等内容(我是 Clojure 的新手,很容易混淆)。

我正在寻找一种简单的方法来同时运行这两个函数。我希望它能像以下方式工作:

(defn fn1 [input] ...) ; costly
(defn fn2 [input] ...) ; costly

(let [[out1 out2] (conc (fn1 x) (fn2 y))] ...)

我希望这个函数返回一个包含一对输出的向量。只有当两个线程都终止时,它才应该返回。理想情况下,这个函数应该适用于任意数量的输入。我认为这是一个简单的模式。


当你说你不想处理 futures 时,这是否意味着你也不希望在 "conc" 函数中使用 futures?据我所知,在这种情况下使用 Clojure 并发原语是惯用的,尽管它们可能会通过 "conc" 中的封装对您隐藏。 - JohnJ
肯定会使用一些并发原语。conc 可以像您喜欢的那样复杂。我只是不想作为用户处理它们。我猜这是“为每个输入启动一个 future”,“等待每个输出”,“返回”。也许它将不得不成为一个宏,不确定。 - MRocklin
1
肯定是一个宏,如果你想要将参数的评估推迟到线程中的conc。我现在正在处理宏定义。 - JohnJ
3个回答

4
在Clojure中使用Future非常容易。无论如何,这里有一个避免使用它们的答案。
(defn conc [& fns]
  (doall (pmap (fn [f] (f)) fns)))

"pmap"在内部使用futures。"doall"将强制计算序列。"
(let [[out1 out2] (conc fn1 fn2)]
        [out1 out2])

请注意,我在尝试保留您的示例时解构了out1out2

为了使用这个,你需要将输入封装在匿名函数中。例如:#(fn1 42) - Arthur Ulfeldt
@ArthurUlfeldt 没错,如果是这种情况,我会选择JohnJ的解决方案。 - Julien Chastang
我认为不将其包装在宏中会导致更好的可组合代码。宏不是一等公民,您无法将它们传递给map或将它们应用于参数列表。 - Arthur Ulfeldt

3

虽然其他答案指出了获得相同行为的其他方法,但您确实需要一个宏来保留所需的语法。以下是一种实现此目的的方法:

(defn f1 [x] (Thread/sleep 500) 5)
(defn f2 [y] 2)

(defmacro conc [& exprs]
  `(map deref
        [~@(for [x# exprs] `(future ~x#))]))

(time (let [[a b] (conc (f1 6) (f2 7))]
       [a b]))
; "Elapsed time: 500.951 msecs"
;= (5 2)

扩展部分展示了它的工作原理:
(macroexpand-1 '(conc (f1 6) (f2 7)))
;= (clojure.core/map clojure.core/deref [(clojure.core/future (f1 6)) 
;=                                       (clojure.core/future (f2 7))])

您指定了两个函数,但这应该适用于任意数量的表达式。

太棒了。这激发了我自己尝试一下。我试着用(map future exprs)替换for来实现它。可惜这并没有起作用,因为future本身就是一个宏。你有什么想法可以进一步简化它吗? - MRocklin
1
是的,这就是宏的问题——不能像函数那样轻松地组合它们。我不知道如何进一步简化它,但 Google Group 上的某个人可能知道。 - JohnJ

2

我知道您不想让最终解决方案暴露出futures,但是使用futures来说明如何做到这一点非常有用,然后将它们包装在某些隐藏这些细节的东西中:

core> (defn fn1 [input] (java.lang.Thread/sleep 2000) (inc input))
#'core/fn1                                                                                     
core> (defn fn2 [input] (java.lang.Thread/sleep 3000) (* 2 input))
#'core/fn2                                                                                     
core> (time (let [f1 (future (fn1 4)) f2 (future (fn2 4))] @f1 @f2))
"Elapsed time: 3000.791021 msecs"  

然后我们可以将其包装在许多Clojure的Future包装器中。最简单的包装器只是一个函数,它接受两个函数并并行运行它们。

core> (defn conc [fn1 fn2] 
         (let [f1 (future (fn1)) 
               f2 (future (fn2))] [@f1 @f2]))
#'core/conc                                                                                    
core> (time (conc #(fn1 4) #(fn2 4)))
"Elapsed time: 3001.197634 msecs"                                                                          

通过将 conc 函数接收运行函数而非待执行的主体,避免了编写宏的必要性。然后可以在调用前添加 # 来创建需要传递给函数的函数。

这也可以使用 map 和 future-call 实现:

core> (map deref (map future-call [#(fn1 4) #(fn2 42)]))
(5 84)  

你可以继续提高 conc 直到它类似于(正如 Julien Chastang 所指出的那样)pmap

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接