我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并以错误失败。似乎在core.async管道中使用mapcat会破坏反压(这是不幸的,原因超出了本问题的范围)。
以下是一些演示问题的代码,通过计算“:x”进入和离开“mapcat”转换器来说明问题:
生产者远超消费者。
看起来我不是第一个发现这个问题的人。但这里给出的解释似乎并不完全覆盖它(尽管它提供了一个足够的解决方法)。从概念上讲,我希望生产者领先,但只有在通道中缓存了少量消息的长度之内。
我的问题是,其他所有消息都在哪里?到输出的第四行时,有7000个
以下是一些演示问题的代码,通过计算“:x”进入和离开“mapcat”转换器来说明问题:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
生产者远超消费者。
看起来我不是第一个发现这个问题的人。但这里给出的解释似乎并不完全覆盖它(尽管它提供了一个足够的解决方法)。从概念上讲,我希望生产者领先,但只有在通道中缓存了少量消息的长度之内。
我的问题是,其他所有消息都在哪里?到输出的第四行时,有7000个
:x
无法解释。