Scalaz-stream中相当于Play Framework的Enumerator.fromCallback是什么?

5
Play Framework的迭代器库定义了一个名为Enumerator.fromCallback的方法,该方法允许基于Future的结果生成元素。
请参考:http://www.playframework.com/documentation/2.2.x/Enumerators
def fromCallback[E](
  retriever: () => Future[Option[E]],
  onComplete: () => Unit = () => (),
  onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E]

你可以在这里看到一个很好的例子,展示了如何使用它从 Web 服务中获取分页结果: http://engineering.klout.com/2013/01/iteratees-in-big-data-at-klout/
def pagingEnumerator(url:String):Enumerator[JsValue]={
  var maybeNextUrl = Some(url) //Next url to fetch
  Enumerator.fromCallback[JsValue] ( retriever = {
    val maybeResponsePromise =
      maybeNextUrl map { nextUrl=>  
        WS.url(nextUrl).get.map { reponse =>
          val json = response.json
          maybeNextUrl = (json \ "next_url").asOpt[String]
          val code = response.status //Potential error handling here
          json
        }   
      }

    /* maybeResponsePromise will be an Option[Promise[JsValue]].
     * Need to 'flip' it, to make it a Promise[Option[JsValue]] to
     * conform to the fromCallback constraints */
    maybeResponsePromise match {
      case Some(responsePromise) => responsePromise map Some.apply
      case None => PlayPromise pure None
    }
  })
}

有没有相应的scalaz-stream代码可以完成同样的任务?我相信可以使用Process.emitProcess.await或者Process.eval来实现,但我想看一个具体的例子。这可能需要将Scala Future转换为Scalaz Task,这里有一个答案:将Scala 2.10 Future转换为Scalaz.concurrent.Future // Task
如果简化一些,我们可以忽略Scala Future与Scalaz Task的区别,并假设我们有一个Task。

仓库中有一个示例,展示了如何从回调函数创建进程:CreatingStreams.scala - Frank S. Thomas
1个回答

2

要从scala.concurrent.Future得到scalaz.concurrent.Task,您可以使用Task.async,当您手头有任务时,可以这样做:

  import java.util.concurrent.atomic.AtomicInteger
  import scalaz.concurrent.Task
  import scalaz.stream.Process.End
  import scalaz.stream._

  val cnt = new AtomicInteger(0)

  val task: Task[String] = Task {
    if (cnt.incrementAndGet() <= 10) s"Task ${cnt.get}" else throw End
  }

  Process.repeatEval(task).runLog.run.foreach(println)

我添加了Task.async步骤,您可以在此处查看完整示例:https://gist.github.com/ezhulenev/10553038 - Eugene Zhulenev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接