尝试让惰性求值适用于无限流

3

我正在尝试使用过滤操作实现一个无限流。我希望通过对尾部进行惰性求值来避免堆栈溢出错误。

abstract class MyStream[+A] {
  def head: A
  def tail: MyStream[A]

  def #::[B >: A](element: B): MyStream[B] // prepend operator

  def filter(predicate: A => Boolean): MyStream[A]
}

class FiniteStream[+A](val head: A, val tail: MyStream[A]) extends MyStream[A] {    
  override def #::[B >: A](element: B): MyStream[B] = new FiniteStream[B](element, this)

  override def filter(predicate: A => Boolean): MyStream[A] = {
    lazy val filteredTail = tail.filter(predicate)
    if (predicate(head)) filteredTail
    else filteredTail
  }
}

class InfiniteStream[+A](override val head: A, generator: A => A) extends MyStream[A] {
  override def tail: MyStream[A] = {
    lazy val tail = new InfiniteStream[A](generator(head), generator)
    tail
  }

  override def #::[B >: A](element: B): MyStream[B] =
    new FiniteStream[B](element, this)

  override def filter(predicate: A => Boolean): MyStream[A] = {
    lazy val filteredTail = tail.filter(predicate)
    if (predicate(head)) head #:: filteredTail
    else filteredTail
  }
}

object MyStream {
    def from[A](start: A)(generator: A => A): MyStream[A] = new InfiniteStream[A](start, generator)
}

val stream: MyStream[Int] = MyStream.from(1)((n: Int) => n + 1)
val filtered = stream.filter(_ % 2 == 0)

但是这个程序确实会因为堆栈溢出错误而崩溃。看起来我的惰性求值策略并没有奏效。尾部仍然在被计算。为什么?

1
FiniteStream?那是什么?在哪里/是什么? - jwvh
抱歉,忘记包含它了。现在已经有了。 - Sahand
1个回答

6
问题是由InfiniteStream.filter引起的,它将尾部过滤器创建为延迟值(lazy value),但立即访问该值会强制执行该值。这将导致整个流在递归调用中被评估,从而使堆栈溢出。

lazy val 延迟了构造变量的表达式的执行,直到被访问。因此,您需要延迟对tail.filter(predicate)的访问,直到流的使用者访问尾部时才进行访问。

最简单和更加功能化的方法是使用视图实现过滤(filter)。也就是说,过滤器返回一个新的流,仅在需要时筛选尾部。

例如:

class FilterStream[+A] private (predicate: predicate: A => Boolean, stream: MyStream) extends MyStream[A] {
  override def head: A = stream.head
  override def tail: MyStream[A] = FilterStream.dropWhile(!predicate(_), stream)
}


object FilterStream {
  def apply[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = {
    new FilterStream(predicate, dropWhile(!predicate(_), stream))
  }

  @tailrec
  def dropWhile[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = {
    if (stream.isEmpty || predicate(stream.head)) stream
    else dropWhile(predicate, stream.tail)
  }
}

最后,您应该考虑使用自己的类型和对象来实现一个空流,因为这样可以有很多原因,也可以让您在生成器决定终止无限流时终止它。

object Nil extends MyStream[Nothing] {
  override def head: A = throw NoSuchElement
  override def tail: MyStream[A] = throw NoSuchElement
}

head和tail方法总是不安全的,另一个改进是使用case类来公开流的形状,然后用户可以在流上模式匹配。这将保护您的用户不必使用像headtail这样的不安全方法。


1
谢谢你的回答。我相信在dropWhile方法的else子句中有一个未平衡的括号。 - Sahand

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接