当mapcat在core.async中破坏了反压时,内存泄漏在哪里?

9
我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并以错误失败。似乎在core.async管道中使用mapcat会破坏反压(这是不幸的,原因超出了本问题的范围)。
以下是一些演示问题的代码,通过计算“:x”进入和离开“mapcat”转换器来说明问题:
(ns mapcat.core
  (:require [clojure.core.async :as async]))

(defn test-backpressure [n length]
  (let [message (repeat length :x)
        input (async/chan)
        transform (async/chan 1 (mapcat seq))
        output (async/chan)
        sent (atom 0)]
    (async/pipe input transform)
    (async/pipe transform output)
    (async/go
      (dotimes [_ n]
        (async/>! input message)
        (swap! sent inc))
      (async/close! input))
    (async/go-loop [x 0]
      (when (= 0 (mod x (/ (* n length) 10)))
        (println "in:" (* @sent length) "out:" x))
      (when-let [_ (async/<! output)]
        (recur (inc x))))))

=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000

生产者远超消费者。
看起来我不是第一个发现这个问题的人。但这里给出的解释似乎并不完全覆盖它(尽管它提供了一个足够的解决方法)。从概念上讲,我希望生产者领先,但只有在通道中缓存了少量消息的长度之内。
我的问题是,其他所有消息都在哪里?到输出的第四行时,有7000个:x无法解释。

在你提供的链接中,Alex提到这是一个错误结果和缓冲限制违规之间的困境。显然ASYNC-124更喜欢正确的答案。 - Davyzhu
关于你的问题,其他消息可能会在此处引用的“takers”中保留(https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86)。不太确定,所以让我们等待更有信心的答案。 - Davyzhu
1个回答

2
更新于2020年1月14日:内存泄漏现已修复。
对于“内存泄漏在哪里?”这个问题,有两种可能的解释。
首先,数据存储在哪里?答案似乎是在扩展转换器下游的通道缓冲区中。
默认情况下,通道使用FixedBuffer(clojure.core.async.impl.buffers/FixedBuffer),它可以判断是否已满,但不会拒绝超过容量。
其次,哪段代码导致缓冲区过载?这似乎是在ManyToManyChannelclojure.core.async.impl.channels/ManyToManyChannel take!方法中(如果我错了,请纠正我),其中对缓冲区的第一次add!调用发生在任何full?调用之前。
似乎 take! 假定每次移除一个项目时都可以向缓冲区添加至少一个项目。对于长时间运行的扩展 transducers,例如 mapcat,这并不总是一个安全的假设。
通过将 this line 更改为 (when (and (.hasNext iter) (not (impl/full? buf))),在本地副本中更改 core.async 的代码,我可以使问题中的代码按预期工作。(注意:我的 core.async 理解不足以保证这是您的用例的强大解决方案。)
更新 2016-09-17:现在有一个问题:http://dev.clojure.org/jira/browse/ASYNC-178 更新于2020年1月14日:现在已经修复,参见https://clojure.atlassian.net/browse/ASYNC-210(尽管早期的工单被关闭为“拒绝”)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接