使用Scalaz 7 zipWithIndex/group enumeratees避免内存泄漏

106

背景

正如这个问题所指出的那样,我正在使用Scalaz 7 iteratees在恒定堆空间中处理大量(即无界)数据流。

我的代码看起来像这样:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))
问题

我似乎遇到了内存泄漏问题,但我对Scalaz/FP不够熟悉,无法确定错误是在Scalaz还是在我的代码中。直觉上,我希望这段代码仅需要大约P倍于Chunk大小的空间。

注意:我在类似的问题中发现了一个OutOfMemoryError,但我的代码没有使用consume

测试

我运行了一些测试来尝试隔离问题。总结起来,只有在同时使用zipWithIndexgroup时才会出现内存泄漏。

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

测试用例的代码:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

问题

  • 我的代码有 bug 吗?
  • 如何使这个程序在常量堆空间中运行?

6
我已经将此事报告为Scalaz的问题 - Aaron Novstrup
1
иҝҷеҸҜиғҪдёҚжҳҜеҫҲжңүи¶ЈпјҢдҪҶдҪ еҸҜд»Ҙе°қиҜ•дҪҝз”Ё-XX:+HeapDumpOnOutOfMemoryError并дҪҝз”ЁEclipse MAT http://www.eclipse.org/mat/еҲҶжһҗиҪ¬еӮЁпјҢд»ҘжҹҘзңӢе“ӘиЎҢд»Јз ҒжӯЈеңЁдҝқз•ҷж•°з»„гҖӮ - huynhjl
10
我尝试使用JProfiler和MAT分析堆,但是无法理清所有匿名函数类的引用等内容。Scala真的需要专门的工具来处理这种情况。 - Aaron Novstrup
如果没有泄漏的情况,而只是因为你所做的事情需要越来越多的内存,该怎么办呢?你可以很容易地在进行操作时维护一个var计数器,而不需要那个特定的函数式编程结构来复制zipWithIndex。 - Ezekiel Victor
@EzekielVictor 我不确定我理解这个评论。您是在建议每个块添加一个Long索引会将算法从常量堆空间更改为非常量堆空间吗?非压缩版本显然使用恒定的堆空间,因为它可以“处理”您愿意等待的任意数量的块。 - Aaron Novstrup
显示剩余2条评论
1个回答

4

对于那些被困在旧的iteratee API中的人来说,这可能并没有什么安慰,但最近我验证了一个等效测试针对scalaz-stream API。这是一个新的流处理API,旨在取代iteratee

为了完整起见,这里是测试代码:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

这应该适用于n参数的任何值(只要你愿意等足够长的时间) - 我测试了2 ^ 14个32MiB数组(即随着时间的推移分配了半TiB的内存总量)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接