Scalaz-stream中的Bucketed Sink

6
我正在尝试创建一个接收流并写入文件的接收器:当达到特定条件(时间,文件大小等)时,当前输出流将关闭,并打开一个新的输出流用于写入到新的文件。
我检查了io对象中不同接收器的创建方式,但实例很少。因此,我尝试跟随resourcechunkW的编写方式。下面是我得出的代码片段,为简单起见,桶现在只由一个Int表示,但最终会成为某些类型的输出流。
  val buckets: Channel[Task, String, Int] = {

    //recursion to step through the stream
    def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = {

      // Emit the value and repeat
      def next(msg: String => Task[Int]) =
        Process.emit(msg) ++
          go(step)


      Process.await[Task, String => Task[Int], String => Task[Int]](step)(
        next
        , Process.halt // TODO ???
        , Process.halt) // TODO ???
    }

   //starting bucket
    val acquire: Task[Int] = Task.delay {
      val startBuck = nextBucket(0)
      println(s"opening bucket $startBuck")
      startBuck
    }

   //the write step
    def step(os: Int): Task[String => Task[Int]] =
      Task.now((msg: String) => Task.delay {
        write(os, msg)
        val newBuck = nextBucket(os)
        if (newBuck != os) {
          println(s"closing bucket $os")
          println(s"opening bucket $newBuck")
        }
        newBuck
      })

    //start the Channel
    Process.await(acquire)(
      buck => go(step(buck))
      , Process.halt, Process.halt)
  }

 def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") }
 def nextBucket(b: Int) = b+1

这里有几个问题:

  1. step在递归开始时被传递一次,之后不再更改。在递归的go中,我不确定如何创建一个新的step任务来使用前一任务中的bucket(Int),因为我必须提供一个字符串才能到达那个任务。
  2. fallbackcleanup函数没有接收到rcv的结果(如果有的话)。在io.resource函数中,这可以正常工作,因为资源是固定的,但在我的情况下,资源可能在任何步骤中更改。我该如何将对当前打开的bucket的引用传递给这些回调函数?

好的,与此同时,我已经创建了自己的 BucketedWriter extends Writer,可以与 resource 一起使用,但它相当命令式(实现Java API)。 - Mortimer
1个回答

0

好的,其中一个选项(即时间)可能是在汇聚处使用简单的go。这个选项基于时间,每小时重新打开文件:

val metronome =  Process.awakeEvery(1.hour).map(true)


def writeFileSink(file:String):Sink[Task,ByteVector] = ???


def timeBasedSink(prefix:String) = {
  def go(index:Int) : Sink[Task,ByteVector] = {
    metronome.wye(write(prefix + "_" + index))(wye.interrupt) ++ go(index + 1)
  }

  go(0)
} 

对于其他选项(例如已写入的字节数),您可以使用类似的技术,只需保持已写入字节的信号并将其与Sink组合即可。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接