如何在Clojure中实现具有提前终止的并行逻辑或操作

7
我想定义一个谓词,它以相应输入的谓词(它们可以作为调用的惰性序列给出)作为输入,在并行运行它们并计算结果的逻辑或,以便谓词调用在返回true时整个计算也终止(返回true)。除了提供时间优化外,这还有助于避免某些情况下的非终止(一些谓词调用可能不会终止)。实际上,将非终止解释为第三个undefined值,该谓词模拟了Kleene's K3 logic(在最初的Kleene代数中的连接)中的或操作。Haskell系列中的这里也介绍了类似的内容。是否有任何(最好是简单的)方法在Clojure中实现此功能?编辑:我决定在阅读评论后添加一些澄清。(a)首先,线程池用尽后发生的情况不太重要。我认为创建一个足够满足我们需要的线程池是一个合理的惯例。(b)最关键的要求是谓词调用开始并行运行,并且一旦谓词调用返回true,所有正在运行的其他线程都会被中断。预期的行为是:(1)如果有一个返回true的谓词调用:并行或者返回true;(2)否则,如果有一个谓词调用没有终止:并行或者不会终止;(3)否则:并行或者返回false。换句话说,它的行为像由false<undefined<true给出的3元格中的连接,其中undefined表示非终止。(c)并行或应该能够以许多谓词和许多谓词输入作为输入。但是,如果它以惰性序列作为输入,则会更好。然后,将并行或命名为pany(表示“parallel any”),我们可以使用以下调用:
作为最后的备注,我认为像pany、双重pall或构建这种早期终止并行规约的机制,应该易于实现甚至内置在像Clojure这样面向并行性的语言中。

经过几次尝试,我最终得到了这个。它将在可调整大小的Java线程池中运行谓词检查,并在找到结果后立即跳过进一步检查。问题是,如果没有找到真正的结果,该代码将无限期地挂起,因为承诺永远不会被传递,因此解除引用承诺将永远阻塞。我无法找到解决方案,但认为我所拥有的可以作为基础。 - Carcigenicate
或者切换到ExecutorService的“invokeAny”方法。看起来好像会超时。虽然我以前从未使用过它。 - Carcigenicate
@Carcigenicate:非常感谢您所付出的努力!请查看我的编辑以获得更清晰的解释。 - peter pun
@pete23:我并没有要求任何人将这篇论文中提到的所有内容转移到Clojure上!我只是想知道这种类型的并行约简是否可以以简单的方式实现,并将该论文作为类似事物的参考,因为它是我最相关的搜索结果。如果难以实现且像我认为的那样自然,这可能会被提议作为Clojure的未来补充吗? - peter pun
我认为将线程池耗尽移出范围,你正在指定一个玩具。任何实现都应该处理前n个计算不终止的情况,其中n > 线程数。 - pete23
显示剩余5条评论
2个回答

1
我将通过一个简化函数来定义我们的谓词。实际上,我们可以重新实现所有Clojure迭代函数以支持此并行操作,但我只会使用reduce作为示例。
我将定义一个计算函数。我将使用相同的函数,但没有任何限制你拥有多个。如果它积累到1000,则该函数是“true”。
(defn computor [acc val]
        (let [new (+' acc val)] (if (> new 1000) (reduced new) new)))

(reduce computor 0 (range))
;; =>
1035

(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation

;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)] 
                 [computor 0 (range Long/MIN_VALUE 0)]])

现在让我们来到重点。我想在每次计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,使用pmap逐步执行是非常缓慢的 - 工作单位太小,不值得线程化。在这里,我已经更改了事情,以便在继续之前对每个工作单位进行1000次迭代。您可能会根据工作负载和一步操作的成本进行调整。
(defn p-or-reducer* [reductions]
        (let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

我然后将这个包装在一个驱动程序中。
(defn p-or [s]
  (p-or-reducer* (map #(apply reductions %) s)))

(p-or predicates)
;; =>
1035

在哪里插入CPU并行性?将p-or-reducer*中的s/map/pmap/即可。我建议只并行化第一个操作,因为这将驱动缩减序列进行计算。

(defn p-or-reducer* [reductions]
        (let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
                             [computor 0 (range)]))

(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not

很难定义一个高性能的通用版本。如果不知道每次迭代的成本,就很难得出有效的并行策略——如果一次迭代需要10秒,那么我们可能只会一步一步地进行。如果它只需要100纳秒,那么我们需要一次性进行多个步骤。


0

您会考虑采用core.async来处理并行任务,用async/goasync/thread,并使用async/alts!实现早期返回吗?

例如,将核心的or函数从串行转换为并行。我们可以创建一个宏(我称之为por),将输入函数(或谓词)包装到async/thread中,然后在它们之上使用套接字选择async/alts!

(defmacro por [& fns]
  `(let [[v# c#] (async/alts!!
                  [~@(for [f fns]
                       (list `async/thread f))])]
     v#))

(time
 (por (do (println "running a") (Thread/sleep 30) :a)
      (do (println "running b") (Thread/sleep 20) :b)
      (do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; running b
;; running c
;; "Elapsed time: 11.919169 msecs"
;; => :c

与串行运行的原始或者相比:

(time
 (or (do (println "running a") (Thread/sleep 30) :a)
     (do (println "running b") (Thread/sleep 20) :b)
     (do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; => :a
;; "Elapsed time: 31.642506 msecs"

如果n个谓词不停止运行,而n又高于core.async线程池的容量,会发生什么情况? - pete23
处理这个问题的一种方法是使用一个函数来启动并行或子句,该函数在超时后返回默认答案,然后我们可以确保调用者始终会得到一个答案。处理运行时间过长的任务需要对任务本身有一定的了解(即如何取消任务)。对于从go通道获取输入的go块任务,我将简单地使用另一个通道作为控制信号以进行早期终止(使用alts!)。 - rmcv
即使调用者得到了一个答案,工作线程仍将继续忙碌! - pete23
如果您将该函数分解为一个go块,其中迭代器可以接收“step”消息或“terminate”消息,那么我认为您已经完成了。关键是将循环控制外部化,以便终止在协调员手中。 - pete23

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接