生成器/块转换为迭代器/流的转换

15
基本上,我想将这个转换为:
def data(block: T => Unit)

转换为流(dataToStream是一个假想的函数,用于执行此转换):

val dataStream: Stream[T] = dataToStream(data)

我想这个问题可以通过使用延续技术来解决:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

感谢,Dawid。

寻找无人问津的答案或者有力的论据证明没有这样的答案,将会获得奖励。 - Dave Griffith
1
你的“块”没有产生任何值。如何将其转换为流?Unit是一个单例。 - Randall Schulz
所需的流是发送到“block”的参数系列,而不是这些调用的结果。 - Dave Griffith
1
你为什么需要Stream?有特殊原因吗?Traversable或TraversableView可以给你很多灵活性。map, flatMap, filter等都是惰性的。当调用像take这样的方法时,它会使用异常来防止每次调用时都要"block"。总而言之,在这里需要Stream似乎是不必要的,这需要(A)使用线程能够在“数据”函数和流迭代之间交换堆栈或(B)缓冲所有值并从此缓冲区创建一个Stream。尽管我希望有惊喜,但这更多是关于JVM上的工具。 - jsuereth
这只是一个例子。我不在乎最终使用Stream、Iterator还是Traversable。关键是将数据生成器转换为惰性、内存和CPU高效的“数据流”。 - Dawid Grzesiak
@Randall Schulz => 将产生值的块转换为流是一项简单的任务。请参见http://gist.github.com/603527 - Dawid Grzesiak
4个回答

11

编辑:修改示例以展示traversable.view的惰性特性

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

toTraversable方法将会把你的数据函数转换成一个可遍历的集合。单独来看,这并不是什么大不了的事情,但你可以将其转换为一个懒加载的TraversableView。以下是一个示例:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

take方法的不幸之处在于它必须多取一个值才能正确工作,但它会提前终止。如果没有“.view”调用,上面的代码看起来是一样的。然而,这里有一个更具有说服力的示例:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

总之,我认为你需要的集合类型是TraversableView,它最容易通过创建可遍历对象并在其上调用“view”来创建视图。如果你真的想要Stream类型,这里有一种在2.8.0.final版本中可以不使用线程创建“Stream”的方法:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

这种方法的不幸之处在于,在创建流之前,它将遍历整个可遍历对象。这也意味着所有的值都需要在内存中缓冲。唯一的替代方案是使用线程。
另外提一下:这也是倾向于将可遍历对象作为scalax.io.File方法“lines”、“chars”和“bytes”的直接返回的动机。

我的观点是,如果您使用TraversableView,则可以将数据视为“流”进行交互。通过要求类型“Stream”,您会限制自己。TraversableView 惰性的。 - jsuereth
嗯,这真的不是一个坏主意。有时候这个解决方案会足够(特别是当你想要遍历一排所有数据时),有时候则不行。请参见 http://gist.github.com/603569 。理想情况下,最后一个示例输出也应该是交错的。很可惜你无法为其创建流或迭代器,或者你可以,但它会先计算所有数据。如果你有一个流/迭代器,你可以同时使用两个或更多数据流。例如从一个迭代器中取(3) ,从另一个迭代器中取(10)。无论如何,这是一段了不起和有帮助的代码! - Dawid Grzesiak
使用线程时,如果您没有消耗完所有数据,线程将不会停止,而是被挂起。因此,它也有缺点... - Dawid Grzesiak
我的问题是关于所提供解决方案的一部分。我是指在一般情况下调用 .view 和 .toStream 的优缺点。流是惰性和内存高效的。 - Dawid Grzesiak
流是惰性和内存高效的,但是在创建流之前,toStream的默认实现必须缓冲整个集合。Iterable接口可以使toStream适当地变得懒惰。大多数集合都扩展了Iterable,因此在这种情况下,它们是惰性的。然而,流并不像你想象的那样内存高效。它们会记忆化->也就是说,它们会保留先前生成的值。在这种情况下,视图更加内存高效,因为它不会保留生成的值。 - jsuereth
显示剩余5条评论

3

这里有一个简单的解决方案,它生成一个线程来消费数据。它将数据发布到一个同步队列中。创建并返回了从队列中拉取数据的流:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   

由于 CPS 的限制,这可能是 Scala 在 v2.8 之前的唯一解决方案。不幸的是,它比使用纯生成器慢了 170 倍。请参见 https://gist.github.com/a79c0a9669eea3d47eee - Dawid Grzesiak

2

我仍需要自己弄清楚如何做到这一点。 我怀疑答案在这里的某个地方:

编辑:删除了显示如何解决不同问题的代码。

编辑2:使用最初发布的代码http://gist.github.com/574873http://gist.github.com/580157,您可以执行以下操作:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data不接受代码块,但我认为这没关系,因为使用 continuation,调用者可以处理代码块。Generator 的代码可以在 github 的 gist 上看到。


嗯,你解决的问题和原帖提出的完全不同吧?原帖中的 data 函数调用了 block 函数十次,而他想把它转换成一个由十个元素组成的流。而你的 data 函数只调用了一次 block 函数。 - sepp2k
@sepp2k,嗯,确实是这样。我想继续是必要的。 - huynhjl
我尝试使用这个线程中的代码 https://dev59.com/xnE95IYBdhLWcg3wrP-w#3758084 但是没有成功。 - Dawid Grzesiak
是的,我之前尝试过。不幸的是,由于 CPS 的限制,它无法解决问题。请查看代码 http://gist.github.com/599575 它返回错误:类型不匹配; found : Unit @scala.util.continuations.cpsParam[Unit,Unit] required: Unit data { i => yld(i) } - Dawid Grzesiak
@Dawid 请看我添加到那段代码片段的注释。 - Daniel C. Sobral

2
这是一个基于分隔的延续实现,改编自@Geoff Reedy的实现:
import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接