如何使用core.async正确地批量处理消息?

8

我希望能够通过计数和/或超时来批处理core.async通道上的消息(例如,10毫秒或10条消息,以先到者为准)。Tim Baldridge在批处理视频中提到了相关内容,但是他使用了core.async中已弃用的函数,而且没有使用transducers。我正在寻找类似以下的解决方案...

(defn batch [in out max-time max-count]
  ...
 )
1个回答

15

对于批处理函数来说,变压器实际上不应该是一个关注点——作为输入通道的接收者,它将看到经过该通道上任何变压器转换的值,并且任何侦听out的接收者也将看到该通道变压器转换的值。

至于实现,下面的函数将从in中获取max-count个项目的批次,或者自上次批次输出以来到达的项目数量,然后将它们输出到out,当输入通道关闭时关闭,受输入通道的变压器(如果有)的影响,并且任何侦听out的接收者也会应用该通道的变压器,如上所述:

(defn batch [in out max-time max-count]
  (let [lim-1 (dec max-count)]
    (async/go-loop [buf [] t (async/timeout max-time)]
      (let [[v p] (async/alts! [in t])]
        (cond
          (= p t)
          (do
            (async/>! out buf)
            (recur [] (async/timeout max-time)))

          (nil? v)
          (if (seq buf)
            (async/>! out buf))

          (== (count buf) lim-1)
          (do
            (async/>! out (conj buf v))
            (recur [] (async/timeout max-time)))

          :else
          (recur (conj buf v) t))))))

很棒的代码,简单而正确。我用它来批处理Redis PubSub消息(使用out作为发布者)。 - siphiuel
了不起的答案。 - john
我在想 clojure.core.async/take 是否适合这种情况,但无论如何,你都需要添加循环(和“永远不要阻塞永远”的超时!)所以说,归根结底,上面的实现看起来仍然非常可靠。 - Andrea Richiardi
在这段代码中,当输入通道关闭时,输出通道 chan 究竟在哪里被关闭了,正如答案所述? - Olim Saidov
@OlimSaidov 这是 (nil? v) 的情况 - tar
就我所看到的,(nil? v)这种情况没有关闭out。我正在使用一种修改过的版本,在这种情况下调用(close! out) - Andrew

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接