我正在尝试使用scalaz iteratee包在常量空间中处理大型zip文件。我需要对zip文件中的每个文件执行长时间运行的进程。这些进程可以(而且应该)并行运行。
我创建了一个EnumeratorT
,它将每个ZipEntry
解压成一个File
对象。函数签名如下:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
我想要附加一个IterateeT
,它将在每个文件上执行长时间运行的过程。基本上,我最终得到了这样的东西:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
当我尝试运行它时:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
我收到了一个 java.lang.OutOfMemoryError: Java heap space
的错误消息。这对我来说是有道理的,因为它试图在内存中构建所有这些 IO
和 Promise
对象的巨大列表。
一些问题:
- 有人有任何想法如何避免这种情况吗?感觉我解决问题的方式不正确,因为我真正关心的只是
longRunningProcess
的副作用。 - 这里的
Enumerator
方法是错误的方法吗?
我几乎没有什么想法了,所以任何帮助都会有所帮助。
谢谢!
更新#1
以下是堆栈跟踪:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
目前,我正在遵循nadavwr的建议确保一切都按照我的想法运作。如有更新,我会及时汇报。
更新 #2
结合下面两个答案的建议,我找到了一个不错的解决方案。正如huynhjl所建议的(并且我使用nadavwr的堆转储分析验证过的),consume
导致每个被填充的 ZipEntry
都被保存在内存中,这就是为什么进程会耗尽内存的原因。我将 consume
更改为 foldM
,并将长时间运行的进程更新为返回 Promise[IOE[Unit]]
而不是文件的引用。这样最终我会得到所有IoExceptions的集合。以下是可行的解决方案:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
这个解决方案会异步上传每个条目并将其扩展。最后,我有一个巨大的充满了许多错误的
Promise
对象的列表。我仍然不完全确信这是Iteratee的正确使用方式,但现在我有了几个可重复、可组合的部分,我可以在我们系统的其他部分中使用它们(对于我们来说这是非常常见的一种模式)。感谢您的所有帮助!