Scalaz-stream如何实现`ask-then-wait-reply` TCP客户端

5

我希望实现一个客户端应用程序,它首先向服务器发送请求,然后等待其回复(类似于http协议)。

我的客户端流程可能如下:

 val topic = async.topic[ByteVector]
 val client = topic.subscribe

这里是API。
trait Client {
  val incoming = tcp.connect(...)(client)
   val reqBus = topic.pubsh()
   def ask(req: ByteVector): Task[Throwable \/ ByteVector] =  {
      (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
      ???
   }
}

那么,如何实现ask的其余部分?
1个回答

6
通常情况下,实现是通过将消息发布到接收器,然后在某些源头(例如您的主题)等待某种回复来完成的。
实际上,我们的代码中有许多这样的习语。
def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
 merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}

首先,它会钩入回复流以等待任何确认我们发送请求的 O。然后,我们发布消息 I 并查询 pf 是否有任何传入的 O 最终将被翻译成 O2,然后终止。


TSource[O2]Process1[O,O2]是相同的吗? - ahjohannessen
实际上不是,这是我们代码库中 Process[Task,O2] 的类型别名,抱歉让您感到困惑。 - Pavel Chlupacek
2
啊,谢谢@pavel :) 很高兴能看到更多像这样在实际中使用的例子 :) 在scalaz-stream维基上添加常见模式的示例将有助于我们这些新手了解该库 :) - ahjohannessen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接