我正在尝试创建一个接收流并写入文件的接收器:当达到特定条件(时间,文件大小等)时,当前输出流将关闭,并打开一个新的输出流用于写入到新的文件。
我检查了
我检查了
io
对象中不同接收器的创建方式,但实例很少。因此,我尝试跟随resource
和chunkW
的编写方式。下面是我得出的代码片段,为简单起见,桶现在只由一个Int
表示,但最终会成为某些类型的输出流。 val buckets: Channel[Task, String, Int] = {
//recursion to step through the stream
def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = {
// Emit the value and repeat
def next(msg: String => Task[Int]) =
Process.emit(msg) ++
go(step)
Process.await[Task, String => Task[Int], String => Task[Int]](step)(
next
, Process.halt // TODO ???
, Process.halt) // TODO ???
}
//starting bucket
val acquire: Task[Int] = Task.delay {
val startBuck = nextBucket(0)
println(s"opening bucket $startBuck")
startBuck
}
//the write step
def step(os: Int): Task[String => Task[Int]] =
Task.now((msg: String) => Task.delay {
write(os, msg)
val newBuck = nextBucket(os)
if (newBuck != os) {
println(s"closing bucket $os")
println(s"opening bucket $newBuck")
}
newBuck
})
//start the Channel
Process.await(acquire)(
buck => go(step(buck))
, Process.halt, Process.halt)
}
def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") }
def nextBucket(b: Int) = b+1
这里有几个问题:
step
在递归开始时被传递一次,之后不再更改。在递归的go
中,我不确定如何创建一个新的step
任务来使用前一任务中的bucket(Int),因为我必须提供一个字符串才能到达那个任务。fallback
和cleanup
函数没有接收到rcv
的结果(如果有的话)。在io.resource
函数中,这可以正常工作,因为资源是固定的,但在我的情况下,资源可能在任何步骤中更改。我该如何将对当前打开的bucket的引用传递给这些回调函数?
BucketedWriter extends Writer
,可以与resource
一起使用,但它相当命令式(实现Java API)。 - Mortimer