Clojure中的短路future

8
我有两个返回布尔值的未来。我想要做的事情基本上是这样的:
(if (or @future1 @future2) 
  ...)

但是,如果优化后的代码中任何一个 future 先完成且其返回值为 true,则无需等待剩余的 future 完成;直接进行下一步操作即可。当然,如果返回值为 false,则需要等待剩余 future 完成后再执行。是否有一种简单的方式来实现这个效果呢?


Dax,期货是否会“返回”值?不会,因为期货不是函数。期货(又称延迟或承诺)通过值解决,这在概念上与函数返回值非常不同。理解这种差异,你的问题应该变得更加清晰。 - Beetroot-Beetroot
@Beetroot-Beetroot 是的,我理解了。虽然鉴于这个问题缺乏答案,我想知道答案是否真的如此清晰明了。你有什么想法吗?我有一些想法,但大多数都很复杂,我希望能得到一个简单的答案来解决这个看似简单的问题。我已经更新了问题,并用更精确的措辞进行了说明。 - Dax Fohl
Dax,虽然我大致了解期货,但我不知道这种特定的语言。我认为@AlexTaggart的答案接近你想要的,但你需要通过函数调用获取两个承诺(可能是通过函数调用获得),而不是在“foo”内部生成它们。 - Beetroot-Beetroot
如果你能读懂JavaScript,那么我可以原则上向你展示如何做到这一点。你需要将其翻译回Clojure。 - Beetroot-Beetroot
4个回答

4

一般情况下,你可以向两个投递人做出同样的承诺。例如:

(defn foo []
  (let [p (promise)]
    (future
      (Thread/sleep (rand-int 1000))
      (deliver p :a))
    (future 
      (Thread/sleep (rand-int 1000))
      (deliver p :b))
    @p))

打电话给 (foo) 时,只要第一个 deliver 发生,就会随机产生 :a:b;另一个 deliver 将是无操作。
对于您的特定情况,您需要返回两个布尔值。我能想到的唯一办法(有点混乱)是使用三个 promise 在 deliverers 之间共享:
(defn foo []
  (let [a (promise)
        b (promise)
        link (promise)]
    (future
      (Thread/sleep (rand-int 5000))
      (let [res (rand-nth [true false])]
        (deliver link res)
        (deliver a res)))
    (future
      (Thread/sleep (rand-int 5000))
      (let [res (rand-nth [true false])]
        (deliver link res)
        (deliver b res)))
    {:or (or @link @a @b)  
     :a? (realized? a) 
     :b? (realized? b)
     :link @link
     :a @a 
     :b @b}))
  • 如果 a 首先返回 true,则 or 立即完成。
  • 如果 a 首先返回 false,则 @a 立即返回,然后阻塞在 @b 上。
  • 如果 b 首先返回 true,则 or 立即完成。
  • 如果 a 首先返回 false,则阻塞在 @a 上。

重复调用 (foo),你会看到预期的结果,特别是当 :ortrue 时,有时 :a?:b? 将为 false,但如果 :orfalse,两者都将始终为 true


感谢您的努力,但我认为这两种方法在我的情况下都不适用 - 我无法访问调用“deliver”的代码。我更新了问题以使其更明显。 - Dax Fohl
常见的情况是你想要从所有的未来中获取数据,但可以并行进行工作。短路会偏离这一点。我倾向于只使用or。最短的执行时间是在第一个未来在第二个未来完成之前产生true时,最长的执行时间是在第一个未来产生false时,并且第二个未来需要更多时间才能完成。 - Alex Taggart

3

现在,使用core.async可以实现这一点,具体方法见我回答的有关Clojure线程处理长时间运行过程并比较它们返回值的问题。那个回答定义了thread-and,而这个问题需要用到thread-or

(defn thread-or
  "Call each of the fs on a separate thread. Return logical
  disjunction of the results. Short-circuit (and cancel the calls to
  remaining fs) on first truthy value returned."
  [& fs]
  (let [futs-and-cs
        (doall (for [f fs]
                 (let [c (chan)]
                   [(future (>!! c (f))) c])))]
    (loop [futs-and-cs futs-and-cs]
      (let [[result c] (alts!! (map peek futs-and-cs))]
        (if result
          (do (doseq [fut (map first futs-and-cs)]
                (future-cancel fut))
              result)
          (let [new-futs-and-cs (remove #(identical? (peek %) c)
                                        futs-and-cs)]
            (if (next new-futs-and-cs)
              (recur new-futs-and-cs)
              (<!! (peek (first new-futs-and-cs))))))))))

测试常量假和常量真:

(thread-or (constantly true) (constantly true))
;;= true

(thread-or (constantly true) (constantly false))
;;= true

(thread-or (constantly false) (constantly false))
;;= false

另外请注意,短路计算确实有效:

;; prints :foo before returning true
(thread-or #(do (Thread/sleep 3000) true)
           #(do (Thread/sleep 1000) (println :foo)))

;; does not print :foo
(thread-or #(do (Thread/sleep 3000) true)
           #(do (Thread/sleep 7000) (println :foo)))

有没有可能与core.match一起使用,以便执行第一个匹配结果集的谓词? - Dax Fohl
你当然可以使用从通道返回的值与 core.match 一起使用,毕竟它们只是普通的Clojure值。在thread-or 变体中,当第一个真值出现时短路,你可以简单地返回这个值,然后对其使用 core.match。如果你想要一个 thread-and,并在最后使用core.match来计算任何一个值,那么你就必须将第一个返回值存储在本地循环变量中(将其作为第二个循环变量 res1 添加进去,初始值为nil,然后 (recur ... (if (nil? res1) result res1))...如前所述)。 - Michał Marczyk
当然,如果是真值情况下,你需要从thread-and返回它。 (除非您想将匹配线路连接到thread-and;或者您可以重写它,以便您可以从外部传递该逻辑作为函数。) - Michał Marczyk
其实,给我一分钟,我会发帖 thread-or 的;最初我本来想这么做的,但出于某种原因我却粘贴了 thread-and - Michał Marczyk
顺便提一下,在撰写上述评论时,我最初确信它是关于“其他”问题的;然后我注意到我也在这里粘贴了 thread-and。对于造成的混淆,我感到抱歉。 - Michał Marczyk
显示剩余2条评论

1

我非常喜欢这个问题。也许我感到惊讶的是,这是一个非常简单的新颖问题。无论如何,撇开口水不谈,我认为我有一些可以适用于您的东西。

与其这样做:

(if (or @future1 @future2)
  ...)

做:

(if (or (and (realized? future1) @future1)
        (and (realized? future2) @future2))
  ...)

这个技巧是在询问某事是否为“真”或“假”之前测试它是否已经实现。可以将其概括为以下宏:
(defmacro future-or [& futures]
  `(or ~@(for [f futures]
          `(and (realized? ~f)
                (deref ~f)))))

然后像这样使用:

(if (future-or future1 future2)
  ...)

等一下。也许我没有正确理解你的问题。也许你想要阻止执行,直到EITHER一个future完成并返回true,这种情况下你会执行你的if语句,或者两个future都完成了但没有一个返回true,这种情况下你会执行你的else语句。那就是另外一回事了。
我能想到最简洁的方式不是很漂亮,但也不是非常长:
(if (loop []
        (cond (or (and (realized? future1) @future1)
                  (and (realized? future2) @future2)) true
              (and (realized? future1) (realized? future2)
                   (not @future1) (not @future2)) false
              :else (recur)))
    ...)

现在,这个程序使用loop来重复循环,直到以下两种情况之一发生:要么其中一个future已经实现且为true,此时循环将返回true;要么所有的futures都已实现且全部为false,此时循环将返回false。在它们的父级(and ...)表达式末尾添加(not ...)表达式是很重要的,这样你就不会被困在检查任何future是否为truefalse的状态中,直到它们全部实现。

这个新的解决方案可以推广为:

(defmacro future-or [& futures]
  `(loop []
     (cond (or ~@(for [f futures]
                   `(and (realized? ~f)
                         (deref ~f)))) true
           (and ~@(for [f futures]
                    `(realized? ~f))
                ~@(for [f futures]
                    `(not (deref ~f)))) false
           :else (recur))))

使用与上述future-or示例相同的方式。

现在,我对优化几乎一无所知。但据我所知,这肯定不如理论上那么高效,因为一旦任何一个给定的未来被实现,就没有必要再次测试其值。好吧,这里有两个解决方案,我称之为future-some。由于在每次循环迭代后测试的未来可能会动态更改,因此我必须将其作为函数。从这个角度来看,这种新方法类似于some,而不是or。在某种程度上,我改变了行为,以获取一组未来(而不是单个参数的可变数量-someor之间的另一个区别)。其中一种解决方案不进行双重检查(我认为):

(defn future-some [futures]
  (if (empty? futures)
    false
    (let [res (reduce (fn [unrealized f]
                        (if (realized? f)
                          (if @f
                            (reduced true)
                            unrealized)
                          (cons f unrealized)))
                      ()
                      futures)]
      (if (true? res)
        true
        (recur res)))))

这里有很多细节需要说明,但要点是:如果没有未来可测试,则返回false;否则,它会迭代下未来列表,测试它们是否有任何实现。如果一个未来已经实现,并且也解除引用为true,则迭代中断以返回true。如果一个未来已经实现但未解除引用为true,则迭代继续到下一项。如果一个未来未实现,则将其添加到列表中以在future-some的下一次递归中使用。

另一个解决方案更加简洁,但有些不太优化:

(defn future-some [futures]
  (if (empty? futures)
    false
    (let [realized (filter realized? futures)]
      (if (some deref realized)
        true
        (recur (remove (set realized) futures))))))

类似于另一个方案,只是先过滤已实现的部分,然后测试,再次过滤(这次是反向的--以获取未实现的),如果需要重复,则进行第二次过滤。第二次过滤是低效的步骤。
我提出的所有解决方案的问题是,将来的取消操作会导致解除引用时出错,当它们应该像假一样继续运行。这可以通过在任何一个解除引用之前,在每个单独的(and ...)表达式中放置(not (future-cancelled? ...))表达式来解决。或者,对于future-some函数,您必须将realized?谓词替换为(some-fn (comp not future-cancelled?) realized?),或对于心虚的人,使用#(and (not (future-cancelled %)) (realized? %))。
再次感谢您的提问。

1
仅为促进思考过程,这个问题是在阅读以下内容后产生的:http://tomasp.net/blog/fsharp-variations-joinads.aspx。他们不得不创建一个特殊的编译器扩展来启用它,但最终它不仅允许上述操作,还允许对异步位进行完整的模式匹配。我认为Clojure是一种非常优雅的语言,可以为解决这个问题提供帮助。 - Dax Fohl
@DaxFohl 谢谢你的链接。也许有一天,如果我了解 F# 和/或单子,我会更加欣赏它。 - Omri Bernstein

0
假设您不想每隔X毫秒轮询未来,也无法控制未来/线程的创建或执行它们的fn,则解决方案就是创建更多的线程,每个线程都在等待一个未来:
(defn wait-for-any [& futures]
  (let [how-many-left (atom (count futures))
        p (promise)
        wait-and-notify (fn [f]
                         (fn []
                           (if @f
                             (deliver p true)
                             (when (zero? (swap! how-many-left dec))
                               (deliver p false)))))]
    (dorun (map (comp future-call wait-and-notify) futures))
    @p))

 (defn sleep-and-return [what-to-return sleep-time]
  (future
    (Thread/sleep sleep-time)
    what-to-return))


(time (println (wait-for-any
                (sleep-and-return false 1000)
                (sleep-and-return false 2000)
                (sleep-and-return false 3000)
                (sleep-and-return false 4000)
                (sleep-and-return false 5000))))

>>>false
>>>"Elapsed time: 5000.933906 msecs"

(time (println (wait-for-any
                (sleep-and-return false 1000)
                (sleep-and-return true 2900)
                (sleep-and-return true 3000)
                (sleep-and-return false 4000)
                (sleep-and-return false 5000))))
>>>true
>>>"Elapsed time: 2901.309695 msecs"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接