我的想法是使用单个Sink,如果从客户端接收到消息,则在此Sink上发出它。
WebsocketSession::send
只是将由这个Sink
发出的事件转发给连接的客户端。@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })
return Mono.zip(input, output).then()
}
fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)
fun <T> fromJson(json : String, clazz : Class<T>) : T{
return objectMapper.readValue(json, clazz)
}
}
这种实现方式不够安全,因为Sink.emitNext
可能会在不同的线程中被调用。
我尝试使用publishOn
和单线程的Scheduler
,以便为所有的WebSocketSession
调用onNext
方法,并且从单个线程中调用。然而,这种方法是不可行的。一个websocket客户端发出一个项目,然后所有后续websocket客户端立即收到onClose事件:
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val scheduler = Schedulers.newSingle("sink-scheduler")
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.publishOn(scheduler) // publish on single threaded scheduler
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
...
}
}
另一个选项是在一些公共锁上进行同步
,以使发射变得线程安全 :
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val lock = Any()
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
synchronized(lock) {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
}
.then()
...
}
}
然而我不确定是否应该这样做。
问题是
在这种情况下,是否可以使用publishOn
以使发射线程安全,如果不能,除了使用synchronized
关键字进行同步之外,还有什么其他解决方案可用。
publishOn
在onNext
之前时,只有一个消息被发送到Sink
,然后所有后续的websocket连接立即接收到onClose
事件吗?在一个简单的独立应用程序中,这样的设置也可以解决问题,但在websocket的情况下,它会失败。 - Michał Krzywański