如何在 Scalaz-stream 中实现 receiveAvailable 转换器

3

简短版本:

我想实现一个函数,返回一个等待“发射”一块值的转换器。

我心目中的函数应该具有以下签名:

/**
 * The `Process1` which awaits the next "effect" to occur and passes all values emitted by
 * this effect to `rcv` to determine the next state.
 */
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???

详情:

我的理解是,我可以使用这个函数来实现下面的函数,我认为这将非常有用:

/**
  * Groups inputs into chunks of dynamic size based on the various effects
  * that back emitted values.
  *
  * @example {{{
  * val numberTask = Task.delay(1)
  * val listOfNumbersTask = Task.delay(List(5,6,7))
  * val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
  * sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
  * }}}
  */
  def chunkByEffect[I]: Process1[I, Vector[I]] = {
    receiveBlock(vec => emit(vec) ++ chunkByEffect)
  }

[更新] 更多详细信息

我的最终目标(略有简化)是实现以下功能:

/**
 * Transforms a stream of audio into a stream of text.
 */
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]

该函数会对语音识别服务进行外部调用,因此每个流中的字节都进行网络调用是不合理的。在进行网络调用之前,需要将字节分块处理。可以将 audio 设为 Process [Task,ByteVector] ,但这需要测试代码知道函数支持的最大块大小,我更希望函数本身来管理它。另外,在该服务被用作其他服务内部时,服务本身会接收具有给定音频大小的网络调用,我希望 chunkXXX 函数能够智能地对数据进行分块,以便它不会占用已经可用的数据。
基本上,从网络传来的音频流将采用 Process [Task,ByteVector] 的形式,并通过flatMap(Process.emitAll(_)) 转换为 Process[Task, Byte]。然而,测试代码会直接产生一个 Process[Task, Byte] 并将其馈送到 voiceRecognition 中。理论上,如果提供适当的组合符号,应该可以提供 voiceRecognition 的实现,使其在这两个流中正确地完成操作,我认为上面描述的 chunkByEffect 函数就是其中的关键。我现在意识到,我需要让 chunkByEffect 函数具有minmax 参数,以指定块的最小和最大大小,而不考虑产生字节的基础 Task
2个回答

1
你需要以某种方式分隔字节。我建议使用一些高级抽象来处理字节流,例如ByteVector。
然后,您可能需要执行手动process1过程,其实现类似于process1.chunkBy,但它操作的是ByteVector。
def chunkBy(separator:ByteVector): Process1[ByteVector, ByteVector] = {
  def go(acc: ByteVector): Process1[ByteVector, ByteVector] =
    receive1Or[ByteVector,ByteVector](emit(acc)) { i =>
       // implement searching of separator in accumulated + new bytes
       ???
    }
  go(ByteVector.empty)
}

然后这将把所有东西连接在一起。
val speech: Process[Task,ByteVector] = ???
def chunkByWhatever: Process1[ByteVector,ByteVector] = ??? 
val recognizer: Channel[Task,ByteVector,String] = ???

//this shall do the trick
speech.pipe(chunkByWhatever).through(recognizer)

不是的。除非我漏掉了什么,否则它们中没有一个像我上面描述的chunkByEffect那样行为。也许我没有很好地描述我希望实现的内容。一旦我可以访问桌面,我会编辑问题。 - jedesah
什么是 Effect?就像任务评估一样吗? - Pavel Chlupacek
是的,任务评估就是我所说的效果。 - jedesah

0

我想目前的答案是,在scalaz-stream中实现这个功能非常困难或不可能。这个库的新版本叫做fs2,它对“分块”有一流的支持,这基本上就是我在这里寻找的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接