使用Scalaz Stream进行解析任务(替换Scalaz Iteratees)

48

介绍

我在许多项目中使用 Scalaz 7 的迭代器,主要用于处理较大的文件。我想开始转向使用 Scalaz stream, 它们旨在取代 iteratee 包(它缺少很多部分,而且使用起来很麻烦)。

Streams 基于 machines(迭代器思想的另一种变体),这些机器已经在 Haskell 中 得到实现。我用过 Haskell 的 machines 库,但机器和流之间的关系并不完全明显(至少对我来说),而 streams 库的文档还有点 稀疏

这个问题是关于一个简单的解析任务,我想看到使用 streams 来实现它,而不是使用迭代器。如果没有其他人先于我回答这个问题,我会回答这个问题,但我肯定不是唯一一个正在进行(或至少正在考虑)这个转变的人,因为我需要完成这个练习,所以我想在公共场合做。

任务

假设我有一个文件,其中包含已被分词和标记了词性的句子:

no UH
, ,
it PRP
was VBD
n't RB
monday NNP
. .

the DT
equity NN
market NN
was VBD
illiquid JJ
. .

每行一个令牌,单词和词性之间用一个空格分隔,空行表示句子边界。我想解析这个文件并返回一个句子列表,我们可以将其表示为字符串元组的列表:

List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)

像往常一样,如果我们遇到无效的输入或文件读取异常,我们希望优雅地失败,不必担心手动关闭资源等问题。

一个迭代器解决方案

首先是一些通用的文件读取内容(实际上应该是迭代器包的一部分,但目前迭代器包没有提供任何高级功能):

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO
import iteratee.{ Iteratee => I, _ }

type ErrorOr[A] = EitherT[IO, Throwable, A]

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B](
  EitherT(action.catchLeft).map(I.sdone(_, I.emptyInput))
)

def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
  lazy val reader = r
  def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
    tryIO(IO(Option(reader.readLine))).flatMap {
      case None       => s.pointI
      case Some(line) => k(I.elInput(line)) >>== apply[A]
    }
  )
}

def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
  def apply[A] = (s: StepT[String, ErrorOr, A]) => tryIO(
    IO(new BufferedReader(new FileReader(f)))
  ).flatMap(reader => I.iterateeT[String, ErrorOr, A](
    EitherT(
      enumBuffered(reader).apply(s).value.run.ensuring(IO(reader.close()))
    )
  ))
}

然后是我们的句子阅读器:
def sentence: IterateeT[String, ErrorOr, List[(String, String)]] = {
  import I._

  def loop(acc: List[(String, String)])(s: Input[String]):
    IterateeT[String, ErrorOr, List[(String, String)]] = s(
    el = _.trim.split(" ") match {
      case Array(form, pos) => cont(loop(acc :+ (form, pos)))
      case Array("")        => cont(done(acc, _))
      case pieces           =>
        val throwable: Throwable = new Exception(
          "Invalid line: %s!".format(pieces.mkString(" "))
        )

        val error: ErrorOr[List[(String, String)]] = EitherT.left(
          throwable.point[IO]
        )

        IterateeT.IterateeTMonadTrans[String].liftM(error)
    },
    empty = cont(loop(acc)),
    eof = done(acc, eofInput)
  )
  cont(loop(Nil))
}

最后,我们的解析操作:
val action =
  I.consume[List[(String, String)], ErrorOr, List] %=
  sentence.sequenceI &=
  enumFile(new File("example.txt"))

我们可以证明它是有效的:
scala> action.run.run.unsafePerformIO().foreach(_.foreach(println))
List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))

我们完成了。

我的要求

使用Scalaz流而不是迭代器实现基本相同的程序。


4
机器实际上最初是在Scala中实现的,但我知道这不是重点。 - Apocalisp
1个回答

49

一种scalaz-stream解决方案:

import scalaz.std.vector._
import scalaz.syntax.traverse._
import scalaz.std.string._

val action = linesR("example.txt").map(_.trim).
  splitOn("").flatMap(_.traverseU { s => s.split(" ") match {
    case Array(form, pos) => emit(form -> pos)
    case _ => fail(new Exception(s"Invalid input $s"))
  }})

我们可以证明它的工作原理:
scala> action.collect.attempt.run.foreach(_.foreach(println))
Vector((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
Vector((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))

我们完成了。

traverseU函数是一个常见的Scalaz组合子。在这种情况下,它被用于遍历splitOn生成的句子Vector,在Process单子中。它等同于map后跟sequence


4
+1,我印象非常深刻。但是如果出现无效输入的情况,如果我想要一个更具体的错误信息怎么办?在迭代器的情况下,我可以使用EnumerateeT.splitOn获取更多类似以下结果的结果,但这样会失去优雅地失败的能力。 - Travis Brown
@TravisBrown 编辑以反映优雅的错误处理要求。 - Apocalisp

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接