为什么`core.async/pipeline`返回一个通道?

5
我刚注意到`pipeline`系列返回了一个看似与管道及其相关通道完全独立的`channel`。
在下面的示例中,您可以分别从`pipes`和`a>/b>`中获取`>! / <!`,它们是不相关的。
据我所知,`pipeline`应该是一个无操作函数,并在设置从`a>`到`b>`的副作用传递时返回`nil`。
那么,我错过了什么,为什么`pipeline`会返回一个`channel`?
(def a> (chan))
(def b> (chan))
(def pipes (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(go (>! pipes "hello world"))
(go (println "Pipes: " (<! pipes)))
(go (>! a> "apples are gooood"))
(go (println "B: " (<! b>)))
2个回答

8
您会获得一个通道,当没有更多元素可以复制时,该通道将被关闭。也就是说,在a>被关闭并且其中的所有元素都已变成大写字母并放置在b>上之后,您可以从生成的通道中使用!来判断流水线操作是否完成(如果您关心),或者您可以直接丢弃该通道。您可能不应该向其写入内容。
这是许多异步操作的常见模式,实际上经常隐式发生:每个go块都返回一个通道,在块完成时将其返回值写入通道,并且许多异步操作使用go块作为它们的返回值,因此您自动获得了这个“工作完成”通道作为结果。

这很有道理,让我有足够的材料来运行一个实验,我将其作为答案与你的一起发布。如果我的解释有误,请告诉我! - Josh.F

3
为了阐述@amalloy的答案,以下示例中,当a>b>能够完成时,它们会被赋予true。由于chan>是无缓冲的,它们在没有其他进程取出它们之前无法完成,例如末尾的println
如果chan>是有缓冲的,a>b>就可以立即完成>!,从而立即输出。
(def chan> (chan 4))
(def a> (go (>! chan> "Apple")))
(go (println "from a>: " (<! a>)))
(def b> (go (>! chan> "Ball")))
(go (println "from b>: " (<! b>)))

(go (println "from chan>: "(<! chan>)))
;; => from chan>: Apple
;; => from a>: true
(go (println "from chan>: "(<! chan>)))
;; => from chan>: Ball
;; => from b>: true

这与“管道(pipeline)”的概念相同。
;; Pipeline-specific

(def a> (chan))
(def b> (chan))
(def p> (pipeline 4 b> (map clojure.string/upper-case) a>))

;; this won't happen until `a>` `close!`s
(go (println "pipeline is done: " (<! p>)))

;; execute the following 2 lines ad lib
(go (>! a> "hi there"))
(go (println "from b>: " (<! b>)))

(comment
  (close! a>) ; triggers the "pipeline is done"
  (close! b>)) ; doesn't trigger it, but `b>` now only returns nil when taking

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接