如何在Spring WebFlux中从多个Flux(WebsocketSession :: receive)正确地将值发送到Sink?

10
在我的简化案例中,我希望将由WebSocket客户端发送的消息广播到所有其他客户端。该应用程序是使用Spring的反应式WebSockets构建的。
我的想法是使用单个Sink,如果从客户端接收到消息,则在此Sink上发出它。WebsocketSession::send只是将由这个Sink发出的事件转发给连接的客户端。
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()
    }

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)
    }

}

这种实现方式不够安全,因为Sink.emitNext可能会在不同的线程中被调用。

我尝试使用publishOn和单线程的Scheduler,以便为所有的WebSocketSession调用onNext方法,并且从单个线程中调用。然而,这种方法是不可行的。一个websocket客户端发出一个项目,然后所有后续websocket客户端立即收到onClose事件:

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        ...
    }

}

另一个选项是在一些公共锁上进行同步,以使发射变得线程安全 :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                    }
                }
                .then()
        ...
    }


}

然而我不确定是否应该这样做。

问题是

在这种情况下,是否可以使用publishOn以使发射线程安全,如果不能,除了使用synchronized关键字进行同步之外,还有什么其他解决方案可用。

3个回答

10

与使用 synchronized 一样悲观的锁定方式不同,您可以创建一个类似于 FAIL_FASTEmitFailureHandler,但它会针对 EmitResult.NON_SERIALIZED_ACCESS 返回 true

这将导致并发发射尝试立即重试,就像在忙循环中一样。

乐观地讲,这将最终成功。如果您想要对无限循环进行额外的防御,甚至可以使自定义处理程序引入延迟或限制其返回 true 的次数。


好的,我会尝试一下。你知道为什么当我在单线程调度程序中使用publishOnonNext之前时,只有一个消息被发送到Sink,然后所有后续的websocket连接立即接收到onClose事件吗?在一个简单的独立应用程序中,这样的设置也可以解决问题,但在websocket的情况下,它会失败。 - Michał Krzywański
@michalk 的 emitNext 是硬编码的,如果检测到并发访问,它将以错误终止订阅者。tryEmitNext 是一种替代的较低级别 API,在这方面可以给您更多的控制,但您必须考虑每个可能的返回值(一个枚举)。emitFailureHandler 是一种中间地带。 - Simon Baslé
@SimonBaslé “你甚至可以让自定义处理程序引入延迟”-这不会潜在地阻塞一些重要的线程吗?或者等待时间很短,对系统影响不大? - Martin Tarjányi
@SimonBaslé SerializedFluxSink的Sinks.Many API等效是什么?我过去通过FluxProcessor.sink()获取它们,但在新的API中我错过了这个功能。 - adutra
"Sinks.many" 受到并发使用的保护,但它们会快速失败而不是试图变得过于智能。以前的方法是将事件放入队列中,并从单个线程中排除它,但这可能会有问题。由于 "tryEmit" API 返回结果,因此这已经不再是一个选项(您可能会在尝试执行 "tryEmitComplete" 时收到 "onNext" 的确认)。 - Simon Baslé

4

除了@simon-baslé的回答之外,这是(针对srping-webflux)的示例代码。它将下游请求发送给订阅者,并在Sinks.EmitResult.FAIL_NON_SERIALIZED响应的情况下重试。 这是Sinks.EmitFailureHandler的定义:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
            .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;

这里是将处理请求的控制器:

@org.springframework.web.bind.annotation.RestController
public class RestController {

    private final Many<String> sink = Sinks.many().multicast().directBestEffort();
    private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
            .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
    @Autowired
    public RestController(ServiceSubscriber serviceSubscriber) {
        sink.asFlux().subscribe(serviceSubscriber);
    }

    @GetMapping(path = "/{id}")
    public Mono<ResponseEntity<Void>> getData(@PathVariable String id) {
        return Mono.fromCallable(() -> {
            sink.emitNext(id, emitFailureHandler);
            return ResponseEntity.ok().<Void>build();
        });
    }
}

1

使用单线程调度程序的publishOn方法应该可以工作,但是需要为每个ReactiveWebSocketHandler使用相同的调度程序实例。

您是否可以使用flatMap将所有receive() Fluxes组合在一起,而不是使用Sink?

我自己解决这个问题的方法采用了Simon建议的忙碌自旋方法(链接1)

请参阅我的答案类似问题的答案


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接