使用core.async等待n个通道

9
同样的,alt!等待n个通道中的一个获得值,我正在寻找等待所有n个通道都获得值的惯用方法。
我需要这个是因为我"spawn"了n个go块来处理异步任务,并且我想知道它们何时完成。我相信有一种非常优美的方法可以实现这一点。
2个回答

14

使用 core.asyncmap 函数:

(<!! (a/map vector [ch1 ch2 ch3]))
;; [val-from-ch-1 val-from-ch2 val-from-ch3]

如果你想知道,vector 是用于返回每个通道中第一个值的向量。我猜 seq 也可以,对吧? - Ory Band
Lambda函数对于a/map的调用需要与通道数量相同的参数,因此seq无法使用。也许您正在寻找a/merge? - Leon Grapenthin

5
你可以使用(mapv #(async/<!! %) channels)的方式。
如果你想要在收到每个值时进行特殊处理,并在最后一个通道产生值后执行某些特殊操作,那么可以利用 alts!/ alts!! 接受通道向量的特性。由于它们是函数而不是宏,所以你可以轻松地传入动态构建的向量。
因此,你可以使用alts!!来等待初始的n个通道,然后再次在剩余的通道上使用它。
(def c1 (async/chan))
(def c2 (async/chan))

(def out
  (async/thread
    (loop [cs [c1 c2] vs []]
      (let [[v p] (async/alts!! cs)
            cs (filterv #(not= p %) cs)
            vs (conj vs v)]
        (if (seq cs)
          (recur cs vs)
          vs)))))

(async/>!! c1 :foo)
(async/>!! c2 :bar)

(async/<!! out)
;= [:foo :bar]

如果您想从所有输入通道获取所有值,然后在它们全部关闭时执行其他操作,您需要使用async/merge

clojure.core.async/merge
([chs] [chs buf-or-n])
获取源通道的集合并返回包含它们所有取出值的通道。默认情况下,返回的通道将无缓冲区,或者可以提供一个buf-or-n。通道将在所有源通道关闭后关闭。


1
谢谢!我认为merge是我需要的。 - Blacksad
太好了!注意:我刚刚添加了等待每个通道中单个值的最简单解决方案(而不是在所有通道关闭之前等待所有值),即(mapv #(async/<!! %) channels) - 不知何故在最初版本中它消失了。 - Michał Marczyk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接