将scalaz-stream输入进程合并似乎会在标准输入上“等待”

7

我有一个简单的程序:

import scalaz._
import stream._

object Play extends App {
  val in1 = io.linesR("C:/tmp/as.txt")
  val in2 = io.linesR("C:/tmp/bs.txt")

  val p = (in1 merge in2) to io.stdOutLines
  p.run.run
}

文件as.txt包含五个a,文件bs.txt包含三个b。我看到这样的输出:
a
b
b
a
a
b
a
a
a

然而,当我将in2的声明更改如下:
val in2 = io.stdInLines

然后我得到了我认为是意外行为。根据文档1,程序应该从每个流中以非确定性方式拉取数据,根据哪个流更快地提供材料。这应该意味着我会立即在控制台上看到一堆a,但事实并非如此。
事实上,在我按下ENTER之前,什么都不会发生。很明显,行为看起来非常像我所期望的,如果我随机选择一个流来获取下一个元素,那么,如果该流被阻塞,则合并进程也会阻塞(即使另一个流包含数据)。
发生了什么? 1 - 好吧,文档很少,但Dan Spiewak他的演讲中非常清楚地说过,它会抓取第一个提供数据的人。
1个回答

6
问题在于stdInLines的实现。它是阻塞的,它从来没有Task.fork一个线程。
尝试将stdInLines的实现更改为以下内容:
def stdInLines: Process[Task,String] =
    Process.repeatEval(Task.apply { 
    Option(scala.Console.readLine())
    .getOrElse(throw Cause.Terminated(Cause.End))
})

原始的io.stdInLines在同一线程中运行readLine(),因此它总是等待您输入内容。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接