如何使用core.async通道模拟Rx的`withLatestFrom`?

3
例如,给定一个包含操作的通道和另一个包含数据的通道,如何编写一个go块,以在数据通道上的最后一个值上应用操作?
(go-loop []
  (let [op (<! op-ch)
        data (<! data-ch)]
    (put! result-ch (op data))))

显然,这是行不通的,因为它要求两个频道具有相同的频率。(请参见http://rxmarbles.com/#withLatestFrom)
2个回答

1

alts!有一个:priority true选项。

一个总是返回某个通道中最新值的表达式可能看起来像这样:

(def in-chan (chan))

(def mem (chan))

(go (let [[ch value] (alts! [in-chan mem] :priority true)]
    (take! mem)      ;; clear mem (take! is non-blocking)
    (>! mem value)   ;; put the new (or old) value in the mem
    value            ;; return a chan with the value in

这个还没有经过测试,可能不够高效(使用 volatile 变量可能更好)。go 块返回的是仅包含值的 channel,但这个想法可以扩展成一些“记忆化”的 channel。


1

使用 alts! 可以实现您想要的功能。

with-latest-from 如下所示,实现了 RxJS 中 withLatestFrom 所具有的相同行为(我认为 :P)。

(require '[clojure.core.async :as async])

(def op-ch (async/chan))
(def data-ch (async/chan))

(defn with-latest-from [chs f]
  (let [result-ch (async/chan)
        latest    (vec (repeat (count chs) nil))
        index     (into {} (map vector chs (range)))]
    (async/go-loop [latest latest]
      (let [[value ch] (async/alts! chs)
            latest     (assoc latest (index ch) value)]
        (when-not (some nil? latest)
          (async/put! result-ch (apply f latest)))
        (when value (recur latest))))
    result-ch))

(def result-ch (with-latest-from [op-ch data-ch] str))

(async/go-loop []
  (prn (async/<! result-ch))
  (recur))

(async/put! op-ch :+)
;= true
(async/put! data-ch 1)
;= true
; ":+1"
(async/put! data-ch 2)
;= true
; ":+2"
(async/put! op-ch :-)
;= true
; ":-2"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接