如何加速Scalaz-Stream文本处理?

3
我该如何加速以下的scalaz-stream代码?目前处理70MB文本大约需要5分钟,因此我可能做错了什么,因为普通的scala等效代码只需要几秒钟。
(关于另一个问题的后续参考
  val converter2: Task[Unit] = {
    val docSep = "~~~"
    io.linesR("myInput.txt")
      .flatMap(line => { val words = line.split(" ");
          if (words.length==0 || words(0)!=docSep) Process(line)
          else Process(docSep, words.tail.mkString(" ")) })
      .split(_ == docSep)
      .filter(_ != Vector())
      .map(lines => lines.head + ": " + lines.tail.mkString(" "))
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("correctButSlowOutput.txt"))
      .run
  }

这只是一个猜测,也许 io.linesR.to(io.fileChunkW...) 部分没有使用缓冲流? - Dylan
我对这个特定情况不太确定,但是Scalaz通常会对字符执行许多通用操作,这会导致每个字符都被封装,从而使速度变慢。您是否已经检查过将其拆分为管道前和管道后操作会发生什么(即运行前半部分并将其存储在缓冲区中,然后输出后半部分)? - Rex Kerr
2个回答

0

我认为你可以使用其中一个process1块方法来进行分块。如果你想要在将行合并成输出格式时进行大量的并行处理,请决定是否重要并使用通道与合并或tee相结合。这也会使它可重复使用。因为你正在进行非常少量的处理,所以你可能会被开销淹没,所以你必须努力使你的工作单元足够大,以免被淹没。


0
以下是基于@user1763729的分块建议。虽然感觉有些笨重,但速度与原始版本一样慢。
  val converter: Task[Unit] = {
    val docSep = "~~~"
    io.linesR("myInput.txt")
      .intersperse("\n") // handle empty documents (chunkBy has to switch from true to false)
      .zipWithPrevious // chunkBy cuts only *after* the predicate turns false
      .chunkBy{ 
        case (Some(prev), line) => { val words = line.split(" "); words.length == 0 || words(0) != docSep } 
        case (None, line) => true }
      .map(_.map(_._1.getOrElse(""))) // get previous element
      .map(_.filter(!Set("", "\n").contains(_)))
      .map(lines => lines.head.split(" ").tail.mkString(" ") + ": " + lines.tail.mkString(" "))
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("stillSlowOutput.txt"))
      .run
  }

编辑:

实际上,仅仅读取文件(不写入或处理)就需要1.5分钟,所以我想加速这个过程的希望不大。

  val converter: Task[Unit] = {
    io.linesR("myInput.txt")
      .pipe(text.utf8Encode)
      .run
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接