在Clojure核心异步中增加延迟

5
当使用clojure.core.async时,是否有一种方法让一个通道等待第一个项目被放入它,然后等待一些短暂的时间,然后获取当前在通道上的所有项目(可能在等待时到达),并且获取所有项目而不阻塞?
也就是说,是否有一种实现get-available-items的方法:
(defn delayer [ch ch2]
  (go (loop []
        (when-let [v (<! ch)]
          (<! (timeout 500))
          (let [vs (get-available-items ch)
                items (cons v vs)]
            (>! ch2 items))
          (recur)))))

基本上,类似于Java中的BlockingQueue.drain这样的东西。
2个回答

6

现在虽然还不能使用频道提供此功能,但我们有计划提供此功能。不过您可以使用以下代码检查频道中是否存在某个内容:

(alts!! [my-chan] :default :nothing-immediately-in-chan)

通过迭代,你可以在不阻塞的情况下排空一个通道。
PS:特别感谢 #clojure 上的 tbaldridge 和 julianlevis 提供的帮助。

2

您可以在同一超时通道上等待,直到“等待时间”用尽,同时收集任何传入值。

这些似乎有效:

(require '[clojure.core.async :as a :refer [<! >! go chan]])

(defn delayer [in out]
  (a/go-loop []
    (when-let [v (<! in)]
      (loop [batch [v] timeout-ch (a/timeout 500)]
        (let [[v ch] (a/alts! [in timeout-ch])]
          (if (= in ch)
            (recur (conj batch v) timeout-ch)
            (>! out batch))))
      (recur))))

请注意,我们只创建了一次超时通道并重复使用它。一个简单的测试来证明它的工作原理:
(def out (chan))
(def in (chan))

(delayer in out)

; print batches as soon as available
(a/go-loop []
  (>pprint (str (java.util.Date.) (<! out)))
  (recur))

 ; put a value every 100 millis
(a/go-loop [i 100]
  (when-not (zero? i)
    (<! (a/timeout 100))
    (>! in i)
    (recur (dec i))))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接