简短版本:
我想实现一个函数,返回一个等待“发射”一块值的转换器。
我心目中的函数应该具有以下签名:
/**
* The `Process1` which awaits the next "effect" to occur and passes all values emitted by
* this effect to `rcv` to determine the next state.
*/
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???
详情:
我的理解是,我可以使用这个函数来实现下面的函数,我认为这将非常有用:
/**
* Groups inputs into chunks of dynamic size based on the various effects
* that back emitted values.
*
* @example {{{
* val numberTask = Task.delay(1)
* val listOfNumbersTask = Task.delay(List(5,6,7))
* val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
* sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
* }}}
*/
def chunkByEffect[I]: Process1[I, Vector[I]] = {
receiveBlock(vec => emit(vec) ++ chunkByEffect)
}
[更新] 更多详细信息
我的最终目标(略有简化)是实现以下功能:
/**
* Transforms a stream of audio into a stream of text.
*/
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]
该函数会对语音识别服务进行外部调用,因此每个流中的字节都进行网络调用是不合理的。在进行网络调用之前,需要将字节分块处理。可以将
audio
设为 Process [Task,ByteVector]
,但这需要测试代码知道函数支持的最大块大小,我更希望函数本身来管理它。另外,在该服务被用作其他服务内部时,服务本身会接收具有给定音频大小的网络调用,我希望 chunkXXX
函数能够智能地对数据进行分块,以便它不会占用已经可用的数据。基本上,从网络传来的音频流将采用
Process [Task,ByteVector]
的形式,并通过flatMap(Process.emitAll(_))
转换为 Process[Task, Byte]
。然而,测试代码会直接产生一个 Process[Task, Byte]
并将其馈送到 voiceRecognition
中。理论上,如果提供适当的组合符号,应该可以提供 voiceRecognition
的实现,使其在这两个流中正确地完成操作,我认为上面描述的 chunkByEffect
函数就是其中的关键。我现在意识到,我需要让 chunkByEffect 函数具有min
和 max
参数,以指定块的最小和最大大小,而不考虑产生字节的基础 Task
。
chunkByEffect
那样行为。也许我没有很好地描述我希望实现的内容。一旦我可以访问桌面,我会编辑问题。 - jedesah