使用Scala迭代器将大型流(从字符串)分成块,使用正则表达式匹配,然后对这些块进行操作?

6

我目前使用的是一种不太像Scala的方法来解析大型Unix邮箱文件。我仍在学习这种语言,并希望挑战自己找到更好的方法,但是我认为自己还没有完全掌握如何使用Iterator以及如何有效地使用它。

我目前使用的是 org.apache.james.mime4j,我使用 org.apache.james.mime4j.mboxiterator.MboxIterator 从文件中获取一个 java.util.Iterator,代码如下:

 // registers an implementation of a ContentHandler that
 // allows me to construct an object representing an email
 // using callbacks
 val handler: ContentHandler = new MyHandler();

 // creates a parser that parses a SINGLE email from a given InputStream
 val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
 // register my handler
 parser.setContentHandler(handler);

 // Get a java.util.Iterator
 val iterator = MboxIterator.fromFile(fileName).build();
 // For each email, process it using above Handler
 iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))

根据我的理解,Scala中的Iterator更加健壮,并且可能更能够处理这样的内容,特别是当我无法将整个文件放入内存时。我需要构建自己版本的MboxIterator。我查看了MboxIterator的源代码并找到了一个很好的正则表达式模式,用于确定单个电子邮件消息的开头,但是在此之后我不知道该怎么做。我像下面这样创建了正则表达式:
 val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);

我想做的事情(基于我目前所知):

  • 从MBOX文件创建FileInputStream
  • 使用Iterator.continually(stream.read())遍历流。
  • 使用.takeWhile()继续读取直到流的末尾。
  • 使用类似MESSAGE_START.matcher(someString).find()的东西分块流,或者利用它找到分隔消息的索引。
  • 读取创建的块或读取索引之间的位。

我觉得我应该能够使用map()find()filter()collect()来完成这个任务,但是我被他们仅仅提供的Int所困扰。

我该怎么做呢?

编辑:

在对此进行更多思考后,我想到了另一种描述我认为需要做的事情的方法:

  1. 我需要不断地从流中读取,直到得到一个匹配我的正则表达式的字符串

  2. 也许要group之前读取的字节?

  3. 把它发送到某个地方进行处理

  4. 从范围中删除它,以便下次遇到匹配时不会分组

  5. 继续读取流,直到找到下一个匹配。

  6. 利润???

编辑2:

我觉得我已经接近了。使用这样一个方法可以让我得到一个迭代器的迭代器。然而,有两个问题:1. 这是浪费内存吗?这是否意味着所有东西都被读入内存中?2. 我仍然需要找出一种通过match进行分割但仍包含在返回的迭代器中的方法。

def split[T](iter: Iterator[T])(breakOn: T => Boolean): 
    Iterator[Iterator[T]] =
        new Iterator[Iterator[T]] {
           def hasNext = iter.hasNext

           def next = {
              val cur = iter.takeWhile(!breakOn(_))
              iter.dropWhile(breakOn)
              cur
            }
 }.withFilter(l => l.nonEmpty)  

1
尽管您已经非常好地解释了您正在尝试解决问题的方式,但您没有解释问题是什么。 您是要将现有的工作代码转换为使用Scala迭代器(如果是这样,请参见[Java conversion shims](https://docs.scala-lang.org/overviews/collections/conversions-between-java-and-scala-collections.html))吗? 您是否担心Java库的异常处理或内存使用情况? 乍一看,“MboxIterator”应该正确地流式传输文件内容(而不是将其全部加载到内存中)...... - Alec
@Alec 我想我对我找到的任何解决方案都不满意。它们应该更“scallaic”,即更简洁。我正在尝试使用正则表达式匹配来拆分一个大型文本文件中的对象,以匹配每个对象的第一行。使用正则表达式匹配分离字符串行流并处理每个组是我的核心问题。 - foxtrotuniform6969
1
你的 split() 方法可能有效,但它似乎违反了迭代器的第一个规则:“在调用迭代器方法后,永远不应该再使用迭代器。最重要的两个例外也是唯一的抽象方法:nexthasNext。”(来自 Scaladocs 页面。) - jwvh
@jwvh 有什么好的替代方案吗? - foxtrotuniform6969
1个回答

2
如果我理解正确,您想要根据可识别的正则表达式模式对大文件进行惰性分块。
您可以尝试为每个请求返回一个迭代器,但正确的迭代器管理并不是微不足道的。
我倾向于将所有文件和迭代器管理从客户端隐藏起来。
"最初的回答"
class MBox(filePath :String) {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :Option[String] =
    if (itr.hasNext) {
      val sb = new StringBuilder()
      sb.append(itr.next() + "\n")
      while (itr.hasNext && !header.matches(itr.head))
        sb.append(itr.next() + "\n")
      Some(sb.mkString)
    } else {
      file.close()
      None
    }
}

testing:

val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul  8 12:08:34 2011
//some text AAA
//some text BBB
//)

mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun  8 12:18:34 2012
//small text
//)

mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan  8 11:18:14 2013
//some text CCC
//some text DDD
//)

mbox.next()  //res3: Option[String] = None

每个打开的文件只有一个Iterator,并且只调用安全方法。文件文本仅在请求时实现(加载),如果可用,则客户端仅获取所请求的内容。如果适用,您可以将每行作为集合Seq[String]的一部分返回,而不是所有行组成的一个长字符串。


更新:这可以修改为易于迭代。

class MBox(filePath :String) extends Iterator[String] {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :String = {
    val sb = new StringBuilder()
    sb.append(itr.next() + "\n")
    while (itr.hasNext && !header.matches(itr.head))
      sb.append(itr.next() + "\n")
    sb.mkString
  }

  def hasNext: Boolean =
    if (itr.hasNext) true else {file.close(); false}
}

现在你可以使用.foreach().map().flatMap()等方法。但是你也可以做一些危险的事情,比如使用.toList,它会加载整个文件。最初的回答。

我还没有机会测试这个。但是,阅读它,它很有道理,看起来比我的实现要干净得多。谢谢! - foxtrotuniform6969
我应该如何添加foreachmap功能,因为我没有实现Iterator?我应该只在MBox值上使用while循环吗?或者,这是错误的问题,因为我从根本上误解了什么? - foxtrotuniform6969
为什么将 class MBox(filePath:String)extends Iterator [Option [String]]def hasNext:Boolean = itr.hasNext 结合起来,以便我可以使用 mapforeach 是错误的?我感觉对于 Iterator 的安全性和它如何发生变化,还有一些问题不太明白。 - foxtrotuniform6969

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接