Spring Boot RSocketRequester如何处理服务器重启问题

11

我对 Springs 的 RSocketRequester 有一个问题。我有一个 RSocket 服务器和客户端。客户端连接到此服务器并请求 @MessageMapping 终端点,一切都如预期运行。

但是如果我重新启动服务器,客户端如何自动重新连接到 RSocket 服务器呢?谢谢

服务器:

@Controller
class RSC {

    @MessageMapping("pong")
    public Mono<String> pong(String m) {
        return Mono.just("PONG " + m);
    }
}

客户:

@Bean
    public RSocketRequester rSocketRequester() {
        return RSocketRequester
                .builder()
                .connectTcp("localhost", 7000)
                .block();

    }

@RestController
class RST {

    @Autowired
    private RSocketRequester requester;

    @GetMapping(path = "/ping")
    public Mono<String> ping(){
        return this.requester
                .route("pong")
                .data("TEST")
                .retrieveMono(String.class)
                .doOnNext(System.out::println);
    }
}

你能否在这里分享你的解决方案? - RamPrakash
3个回答

13

已更新至 Spring Framework 5.2.6+ 版本

您可以使用io.rsocket.core.RSocketConnector#reconnect来实现。

@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rSocketRequesterBuilder) {
    return rSocketRequesterBuilder
            .rsocketConnector(connector -> connector
                    .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", 7000);
}

@RestController
public class RST {
    @Autowired
    private Mono<RSocketRequester> rSocketRequesterMono;

    @GetMapping(path = "/ping")
    public Mono<String> ping() {
        return rSocketRequesterMono.flatMap(rSocketRequester ->
                rSocketRequester.route("pong")
                        .data("TEST")
                        .retrieveMono(String.class)
                        .doOnNext(System.out::println));
    }
}

1
感谢您的回答,我结合了两种方法,现在已经得到了可行的解决方案。非常感谢您的帮助。 - George
@George,你能否在这里分享一下你的解决方案? - RamPrakash
从Spring 5.2.6版本开始,RSocketRequester.Builder.rsocketFactory方法已被弃用,因此上述解决方案不再可用。替代方法是使用RSocketRequester.Builder.rsocketConnector。有没有人有使用rsockeConnector的示例代码? - David V
@DavidV,我已经更新了新的API答案。现在它不那么困难了。 - Alexander Pankin

6

我认为在应用程序中不需要创建RSocketRequester bean。与WebClient(具有可重用连接池)不同,RSocket请求者包装了单个RSocket即单个网络连接。

我认为最好是存储一个Mono<RSocketRequester>并在需要时订阅它以获得实际的请求者。因为您不希望为每个调用创建新的连接,所以可以缓存结果。由于Mono retryXYZ运算符,您可以通过多种方式细化重新连接行为。

您可以尝试以下内容:

@Service
public class RSocketPingService {

    private final Mono<RSocketRequester> requesterMono;

    // Spring Boot is creating an auto-configured RSocketRequester.Builder bean
    public RSocketPingService(RSocketRequester.Builder builder) {
        this.requesterMono = builder
                .dataMimeType(MediaType.APPLICATION_CBOR)
                .connectTcp("localhost", 7000).retry(5).cache();
    }

    public Mono<String> ping() {
        return this.requesterMono.flatMap(requester -> requester.route("pong")
                .data("TEST")
                .retrieveMono(String.class));
    }


}

当我使用你的代码时,结果是相同的。我得到了异常java.nio.channels.ClosedChannelException: null这就是我想要解决的问题。当与rsocket服务器的连接中断时,如何自动修复,当服务器再次启动时。我正在研究更多关于RSocketLoadBalancedMono,看看它是否可以解决我的问题或者潜在地使用Spring的重试机制。你认为呢? - George
谢谢你的答复。我对代码进行了一点更改,这就是为什么第一次运行失败的原因。所以当服务器重新上线时,它会重新订阅此 Mono 并重新连接,这就是我需要的。非常感谢。 - George
1
@George,你能分享一个可行的答案吗?谢谢! - simbo1905
1
@George +1 你能分享一下完整的工作解决方案吗?我也需要。谢谢! - javaistaucheineinsel

1
这里的答案https://dev59.com/clMH5IYBdhLWcg3wpgzz#58890649是正确的。唯一需要补充的是reactor.util.retry.Retry有很多选项可用于配置重试逻辑,甚至包括日志记录。
因此,我会稍微改进原始答案,以便我们增加重试之间的时间直到达到最大值(16秒),并在每次重试之前记录失败-这样我们就可以监视连接器的活动:
@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder builder) {

    return builder.rsocketConnector(connector -> connector.reconnect(Retry.backoff(Integer.MAX_VALUE, Duration.ofSeconds(1L))
                                                                          .maxBackoff(Duration.ofSeconds(16L))
                                                                          .jitter(1.0D)
                                                                          .doBeforeRetry((signal) -> log.error("connection error", signal.failure()))))
                  .connectTcp("localhost", 7000);

}
 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接