如何使用转换器从另一个通道创建通道?

9
我想从另一个仅筛选特定消息的通道创建 clojure.core.async 通道。因此,我找到了一个名为 filter< 的函数。
=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2

但是这个函数及其相关函数已被标记为过时:

过时 - 这个函数将会被移除。使用transducer替代。

有一些方法可以使用transducer来与通道结合,例如使用带有xform参数的chan函数。如何使用transducer从现有通道构建新的通道?

1个回答

8

我做了一些调查研究,找到了几篇有趣的文章(第一篇第二篇),然后使用pipeline实现了一些功能。

(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))

(pipeline 4 c2 (filter even?) c1)

(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2

我链接的第二篇文章通过一些辅助函数使管道函数更加简洁:
(defn ncpus []
  (.availableProcessors (Runtime/getRuntime)))

(defn parallelism []
  (+ (ncpus) 1))

(defn add-transducer
  [in xf]
  (let [out (chan (buffer 16))]
    (pipeline (parallelism) out xf in)
    out))

然后你可以简单地使用通道绑定它们。
(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))

为了完整回答问题,正如您自己发现的,您可以以类似的方式使用管道符(pipe):
(defn pipe-trans
  [ci xf]
  (let [co (chan 1 xf)]
    (pipe ci co)
    co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))

受到您的启发,我使用 pipe 创建了一个工厂方法。(defn from-chan [ci xf] (let [co (chan 1 xf)] (pipe ci co) co)) - sschmeck
很好,我很高兴能够帮助。 - Mark Fisher
1
我已经添加了你的工厂方法的一个版本,以便在其他人搜索时尝试完成答案。 - Mark Fisher
注意到这个:http://dev.clojure.org/jira/browse/ASYNC-153 看起来pipe-trans可以简化为(defn pipe-trans [ci xf] (pipe ci (chan 1 xf)))但是没有文档记录。 - Lof

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接