Scalaz 7迭代器如何处理大型zip文件(OutOfMemoryError)

5

我正在尝试使用scalaz iteratee包在常量空间中处理大型zip文件。我需要对zip文件中的每个文件执行长时间运行的进程。这些进程可以(而且应该)并行运行。

我创建了一个EnumeratorT,它将每个ZipEntry解压成一个File对象。函数签名如下:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

我想要附加一个IterateeT,它将在每个文件上执行长时间运行的过程。基本上,我最终得到了这样的东西:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

当我尝试运行它时:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

我收到了一个 java.lang.OutOfMemoryError: Java heap space 的错误消息。这对我来说是有道理的,因为它试图在内存中构建所有这些 IOPromise 对象的巨大列表。

一些问题:

  • 有人有任何想法如何避免这种情况吗?感觉我解决问题的方式不正确,因为我真正关心的只是 longRunningProcess 的副作用。
  • 这里的 Enumerator 方法是错误的方法吗?

我几乎没有什么想法了,所以任何帮助都会有所帮助。

谢谢!

更新#1

以下是堆栈跟踪:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

目前,我正在遵循nadavwr的建议确保一切都按照我的想法运作。如有更新,我会及时汇报。

更新 #2

结合下面两个答案的建议,我找到了一个不错的解决方案。正如huynhjl所建议的(并且我使用nadavwr的堆转储分析验证过的),consume 导致每个被填充的 ZipEntry 都被保存在内存中,这就是为什么进程会耗尽内存的原因。我将 consume 更改为 foldM,并将长时间运行的进程更新为返回 Promise[IOE[Unit]] 而不是文件的引用。这样最终我会得到所有IoExceptions的集合。以下是可行的解决方案:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

这个解决方案会异步上传每个条目并将其扩展。最后,我有一个巨大的充满了许多错误的Promise对象的列表。我仍然不完全确信这是Iteratee的正确使用方式,但现在我有了几个可重复、可组合的部分,我可以在我们系统的其他部分中使用它们(对于我们来说这是非常常见的一种模式)。
感谢您的所有帮助!

长时间的进程是做什么的?它会从压缩内容中计算出什么吗? - huynhjl
压缩文件中的每个文件都是一个图像。长时间的过程将该文件上传到 Rackspace CloudFiles。一旦我弄明白了这一点,我将需要添加额外的流程来调整图像大小,然后再上传它们。 - RJ Regenold
迭代器感觉不是这项工作的正确抽象,因为您想要并行化工作负载。我认为使用Actor会更好。 - huynhjl
很有趣,因为演员实际上是我开始的地方,然后在某个地方读到说它们不适合半连续批处理。 推荐使用迭代器! 我同意,我挖掘得越深,它就越像错误的抽象。 我将尝试调试我的代码,因为我有一个想法,可以创建一个运行_N_个Promise的迭代器,阻塞直到获得响应,然后请求更多输入。这听起来合理吗?谢谢! - RJ Regenold
3个回答

4

谢谢你的回答。最终,使用 foldM 似乎是关键。 - RJ Regenold

1
你的longRunningProcess在内存方面有多昂贵?文件缩减呢?它们被执行的次数是否符合您的预期?(一个简单的计数器会很有帮助)。
堆栈跟踪将有助于确定压垮骆驼的最后一根稻草-有时这就是罪魁祸首。
如果您想确定占用了那么多内存,可以使用-XX:+HeapDumpOnOutOfMemoryError JVM参数,然后使用VisualVM、Eclipse MAT或其他堆分析器进行分析。
后续
对我来说,枚举承诺似乎很奇怪。独立于枚举器和迭代器启动计算是反直觉的。基于迭代器的解决方案可能更适合返回“惰性”元素的枚举器,而不是承诺。不幸的是,这将使您对单个文件的处理变得串行,但这就是迭代器-非阻塞流处理。
基于Actor的解决方案更适合我的意见,但是对于您要完成的任务(至少是您分享的部分),两者都过于复杂。
请考虑使用Scala 2.10的scala.concurrent包中的普通futures/promises,并确保查看Scala的并行集合。在这些方法未能满足需求之前,请勿将其他概念引入代码中。尝试定义一个固定大小的ExecutionContext来限制您的并行性。

好的建议。我正在逐步检查确保一切都像我所想的那样被执行。我已经在上面更新了我的问题并附上了堆栈跟踪信息。接下来我将尝试进行堆转储。谢谢! - RJ Regenold
关于您的后续:我同意您对使用Iteratee进行此过程的担忧。从我发布的内容来看,这显然是过度设计了。但是,在我们的应用程序中,下载文件(或文件),流式传输内容,处理每个条目,然后执行某些操作的模式随处可见。我觉得Iteratee为我提供了一些不错的、可重用的代码块,可以用来构建这些更大的过程。非常感谢您的时间和帮助! - RJ Regenold

0

我在快速阅读后开始回答,不知怎么地,脑海中一直想着“堆栈溢出”而不是“内存不足错误”……可能是URL的原因吧 :-)

然而,依赖递归的函数计算容易受到堆栈溢出的影响,所以我将答案留在这里供任何遇到此问题的人参考,并承诺尽力提供更相关的答案。

如果你遇到了堆栈溢出,你需要一个“跳板”,这个构造可以在递归之间将你的计算从堆栈中提升出来。

请参见Learning Scalaz Day 18中标题为“Stackless Scala with Free Monads”的部分,这是@eed3si9n的一系列优秀文章之一。

还可以参见@mpilquist的this gist,其中演示了一个跳板迭代器。


1
哈哈,当你谈论长时间运行的功能进程时,stackoverflow.com是一个不幸的名称。 - RJ Regenold

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接