这是绝对正确的设计,值得节省资源并仅为每个客户端使用一个连接来进行所有可能的操作。
然而,不要重复造轮子,使用协议可以提供您所有这些类型的通信。
- RSocket具有请求-响应模型,允许您进行最常见的客户端-服务器交互。
- RSocket具有请求流通信模型,因此您可以满足所有需求并异步返回一系列事件,同时重用同一连接。 RSocket执行逻辑流到物理连接的所有映射,并将其映射回来,因此您不需要自己处理痛苦的事情。
- RSocket具有更多交互模型,例如单向消息和流-流,在双向发送数据流时非常有用。
如何在Spring中使用RSocket
其中一种方法是使用RSocket协议的RSocket-Java实现。 RSocket-Java基于Project Reactor构建,因此它自然适用于Spring WebFlux生态系统。
不幸的是,没有与Spring生态系统进行特色集成。幸运的是,我花了几个小时提供了一个简单的RSocket Spring Boot Starter,将Spring WebFlux与RSocket集成,并公开WebSocket RSocket服务器以及WebFlux Http服务器。
RSocket是更好的方法吗?
基本上,RSocket隐藏了自己实现相同方法的复杂性。使用RSocket,我们不必担心交互模型的定义作为自定义协议以及作为Java中的实现。 RSocket为我们提供了向特定逻辑通道传送数据的内置客户端,因此我们不必为此发明自定义实现。
由于RSocket仅是协议,因此不提供任何消息格式,因此这是业务逻辑的挑战。但是,RSocket-RPC项目提供了Protocol Buffer作为消息格式,并重用与GRPC相同的代码生成技术。因此,使用RSocket-RPC,我们可以轻松构建客户端和服务器的API,并且完全不需要关心传输和协议抽象。
同样的RSocket Spring Boot集成也提供了RSocket-RPC使用的示例。
好吧,这没有说服我,我仍然想要一个自定义的WebSocket服务器
因此,为了实现这个目的,你必须自己实现。我以前已经做过一次,但是我不能指向那个项目,因为它是一个企业项目。
然而,我可以分享一些代码示例,可以帮助你构建一个合适的客户端和服务器。
服务器端
处理程序和开放逻辑订阅者映射
首先必须考虑的第一点是,一个物理连接中的所有逻辑流应该被存储在某个地方:
class MyWebSocketRouter implements WebSocketHandler {
final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
@Override
public Mono<Void> handle(WebSocketSession session) {
final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
...
}
}
在上面的示例中有两个地图。第一个是路线映射,它允许您根据传入的消息参数来识别路线等。第二个是为请求流用例创建的(在我的情况下,它是活动订阅地图),因此您可以发送创建订阅的消息帧,或订阅特定操作并保持该订阅状态,因此一旦执行取消订阅操作,如果存在订阅,则会取消订阅。
使用处理器进行消息多路复用
为了将所有逻辑流的消息发送回来,您必须将消息多路复用到一个流中。例如,使用Reactor,您可以使用UnicastProcessor来实现:
@Override
public Mono<Void> handle(WebSocketSession session) {
final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
...
return Mono
.subscriberContext()
.flatMap(context -> Flux.merge(
session
.receive()
...
.cast(ActionMessage.class)
.publishOn(Schedulers.parallel())
.doOnNext(am -> {
switch (am.type) {
case CREATE:
case UPDATE:
case CANCEL: {
...
}
case SUBSCRIBE: {
Flux<ResponseMessage<?>> flux = Flux
.from(
channelsMapping.get(am.getChannelId())
.get(ActionMessage.Type.SUBSCRIBE)
.handle(am)
);
if (flux != null) {
channelsIdsToDisposableMap.compute(
am.getChannelId() + am.getSymbol(),
(cid, disposable) -> {
...
return flux
.subscriberContext(context)
.subscribe(
funIn::onNext,
e -> {
funIn.onNext(
new ResponseMessage<>(
0,
e.getMessage(),
...
ResponseMessage.Type.ERROR
)
);
}
);
}
);
}
return;
}
case UNSABSCRIBE: {
Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
if (disposable != null) {
disposable.dispose();
}
}
}
})
.then(Mono.empty()),
funIn
...
.map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
.as(session::send)
).then()
);
}
从上面的示例中,我们可以看到有很多事情:
- 消息应包括路由信息
- 消息应包括与其相关的唯一流ID。
- 单独的处理器用于消息复用,其中错误也应作为消息
- 每个通道都应该在某个地方存储,在这种情况下,我们有一个简单的用例,每个消息可以提供消息的
Flux
或只是一个Mono
(在单声道的情况下,可以更简单地在服务器端实现,因此您不必保留唯一的流ID)。
- 此示例不包括消息编码解码,因此这个挑战留给你。
客户端
客户端也不那么简单:
处理会话
为了处理连接,我们必须分配两个处理器,以便我们可以使用它们来复用和解复用消息:
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
return Flux.merge(
session.receive()
.subscribeWith(incoming)
.then(Mono.empty()),
session.send(outgoing)
).then();
}
将所有逻辑流储存至某处
所有已创建的流(无论是Mono
还是Flux
)都应该被储存在某个地方,以便我们可以区分消息属于哪个流:
Map<String, MonoSink> monoSinksMap = ...
Map<String, FluxSink> fluxSinksMap = ...
由于MonoSink和FluxSink没有相同的父接口,我们需要保留两个映射。
消息路由
在上面的示例中,我们只考虑了客户端的初始部分。现在我们需要构建一个消息路由机制:
...
.subscribeWith(incoming)
.doOnNext(message -> {
if (monoSinkMap.containsKey(message.getStreamId())) {
MonoSink sink = monoSinkMap.get(message.getStreamId());
monoSinkMap.remove(message.getStreamId());
if (message.getType() == SUCCESS) {
sink.success(message.getData());
}
else {
sink.error(message.getCause());
}
} else if (fluxSinkMap.containsKey(message.getStreamId())) {
FluxSink sink = fluxSinkMap.get(message.getStreamId());
if (message.getType() == NEXT) {
sink.next(message.getData());
}
else if (message.getType() == COMPLETE) {
fluxSinkMap.remove(message.getStreamId());
sink.next(message.getData());
sink.complete();
}
else {
fluxSinkMap.remove(message.getStreamId());
sink.error(message.getCause());
}
}
})
以上代码示例展示了如何路由传入的消息。
多路请求
最后一个部分是消息多路复用。为此,我们将涵盖可能的发送者类实现:
class Sender {
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
public Sender () {
//在此处创建WebSocket连接并放置先前提到的代码
}
Mono<R> sendForMono(T data) );
}
Flux<R> sendForFlux(T data) );
}
}
自定义实现总结
- 困难重重
- 没有支持反压的实现,这可能是另一个挑战
- 很容易自己搞砸
要点
- 请使用RSocket,不要自己发明协议,这太难了!!!
- 想要了解更多关于RSocket的知识,请参考Pivotal团队的演讲视频
- 想要了解更多关于RSocket的知识,请参考我的一次演讲视频
- 有一个基于RSocket开发的特色框架叫做Proteus - 你可能会对它感兴趣 - https://www.netifi.com/
- 想要了解更多关于Proteus的知识,请参考RSocket协议的核心开发者的演讲视频