Webflux websocketclient,如何在同一会话中发送多个请求[设计客户端库]

16

TL;DR;

Using Spring WebFlux WebSocket implementation, we are designing a WebSocket server with HTTP operations. We want to expose one endpoint using WebSocket to allow multiple operations through a single connection. Is this a suitable design?

Long Version

We intend to use reactive web sockets from spring-webflux for a new project. Our aim is to create a reactive client library that allows consumers to connect to the server.

For the server-side, we receive a request, read a message, save it, and return a static response:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}

在客户端,当有人调用save方法时,我们希望发起一个调用,并返回来自server的响应。

public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}

我们不确定如何进行设计。理想情况下,我们认为应该有:

1)仅调用一次client.execute并以某种方式保存session。后续调用应使用相同的会话发送数据。

2)如何返回我们在session.receive中收到的服务器响应?

3)当session.receive中的响应非常大(不仅是静态字符串而是事件列表)时,在fetch的情况下怎么办?

我们正在进行一些研究,但我们无法在网上找到关于webflux-websocket-client文档/实现的合适资源。有什么建议可以帮助我们前进吗?

2个回答

23

请使用 RSocket

这是绝对正确的设计,值得节省资源并仅为每个客户端使用一个连接来进行所有可能的操作。

然而,不要重复造轮子,使用协议可以提供您所有这些类型的通信。

  • RSocket具有请求-响应模型,允许您进行最常见的客户端-服务器交互。
  • RSocket具有请求流通信模型,因此您可以满足所有需求并异步返回一系列事件,同时重用同一连接。 RSocket执行逻辑流到物理连接的所有映射,并将其映射回来,因此您不需要自己处理痛苦的事情。
  • RSocket具有更多交互模型,例如单向消息流-流,在双向发送数据流时非常有用。

如何在Spring中使用RSocket

其中一种方法是使用RSocket协议的RSocket-Java实现。 RSocket-Java基于Project Reactor构建,因此它自然适用于Spring WebFlux生态系统。

不幸的是,没有与Spring生态系统进行特色集成。幸运的是,我花了几个小时提供了一个简单的RSocket Spring Boot Starter,将Spring WebFlux与RSocket集成,并公开WebSocket RSocket服务器以及WebFlux Http服务器。

RSocket是更好的方法吗?

基本上,RSocket隐藏了自己实现相同方法的复杂性。使用RSocket,我们不必担心交互模型的定义作为自定义协议以及作为Java中的实现。 RSocket为我们提供了向特定逻辑通道传送数据的内置客户端,因此我们不必为此发明自定义实现。

使用RSocket-RPC使其更好

由于RSocket仅是协议,因此不提供任何消息格式,因此这是业务逻辑的挑战。但是,RSocket-RPC项目提供了Protocol Buffer作为消息格式,并重用与GRPC相同的代码生成技术。因此,使用RSocket-RPC,我们可以轻松构建客户端和服务器的API,并且完全不需要关心传输和协议抽象。

同样的RSocket Spring Boot集成也提供了RSocket-RPC使用的示例

好吧,这没有说服我,我仍然想要一个自定义的WebSocket服务器

因此,为了实现这个目的,你必须自己实现。我以前已经做过一次,但是我不能指向那个项目,因为它是一个企业项目。 然而,我可以分享一些代码示例,可以帮助你构建一个合适的客户端和服务器。

服务器端

处理程序和开放逻辑订阅者映射

首先必须考虑的第一点是,一个物理连接中的所有逻辑流应该被存储在某个地方:

class MyWebSocketRouter implements WebSocketHandler {

  final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;


  @Override
  public Mono<Void> handle(WebSocketSession session) {
    final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
    ...
  }
}

在上面的示例中有两个地图。第一个是路线映射,它允许您根据传入的消息参数来识别路线等。第二个是为请求流用例创建的(在我的情况下,它是活动订阅地图),因此您可以发送创建订阅的消息帧,或订阅特定操作并保持该订阅状态,因此一旦执行取消订阅操作,如果存在订阅,则会取消订阅。
使用处理器进行消息多路复用
为了将所有逻辑流的消息发送回来,您必须将消息多路复用到一个流中。例如,使用Reactor,您可以使用UnicastProcessor来实现:
@Override
public Mono<Void> handle(WebSocketSession session) {
  final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
  ...

  return Mono
    .subscriberContext()
    .flatMap(context -> Flux.merge(
      session
        .receive()
        ...
        .cast(ActionMessage.class)
        .publishOn(Schedulers.parallel())
        .doOnNext(am -> {
          switch (am.type) {
            case CREATE:
            case UPDATE:
            case CANCEL: {
              ...
            }
            case SUBSCRIBE: {
              Flux<ResponseMessage<?>> flux = Flux
                .from(
                  channelsMapping.get(am.getChannelId())
                                 .get(ActionMessage.Type.SUBSCRIBE)
                                 .handle(am) // returns Publisher<>
                );

              if (flux != null) {
                channelsIdsToDisposableMap.compute(
                  am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
                  (cid, disposable) -> {
                    ...

                    return flux
                      .subscriberContext(context)
                      .subscribe(
                        funIn::onNext, // send message to a Processor manually
                        e -> {
                          funIn.onNext(
                            new ResponseMessage<>( // send errors as a messages to Processor here
                              0,
                              e.getMessage(),
                              ...
                              ResponseMessage.Type.ERROR
                            )
                          );
                        }
                      );
                  }
                );
              }

              return;
            }
            case UNSABSCRIBE: {
              Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());

              if (disposable != null) {
                disposable.dispose();
              }
            }
          }
        })
        .then(Mono.empty()),

        funIn
            ...
            .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
            .as(session::send)
      ).then()
    );
}

从上面的示例中,我们可以看到有很多事情:

  1. 消息应包括路由信息
  2. 消息应包括与其相关的唯一流ID。
  3. 单独的处理器用于消息复用,其中错误也应作为消息
  4. 每个通道都应该在某个地方存储,在这种情况下,我们有一个简单的用例,每个消息可以提供消息的Flux或只是一个Mono(在单声道的情况下,可以更简单地在服务器端实现,因此您不必保留唯一的流ID)。
  5. 此示例不包括消息编码解码,因此这个挑战留给你。

客户端

客户端也不那么简单:

处理会话

为了处理连接,我们必须分配两个处理器,以便我们可以使用它们来复用和解复用消息:

UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
  return Flux.merge(
     session.receive()
            .subscribeWith(incoming)
            .then(Mono.empty()),
     session.send(outgoing)
  ).then();
}

将所有逻辑流储存至某处

所有已创建的流(无论是Mono还是Flux)都应该被储存在某个地方,以便我们可以区分消息属于哪个流:

Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;

由于MonoSink和FluxSink没有相同的父接口,我们需要保留两个映射。

消息路由

在上面的示例中,我们只考虑了客户端的初始部分。现在我们需要构建一个消息路由机制:

...
.subscribeWith(incoming)
.doOnNext(message -> {
    if (monoSinkMap.containsKey(message.getStreamId())) {
        MonoSink sink = monoSinkMap.get(message.getStreamId());
        monoSinkMap.remove(message.getStreamId());
        if (message.getType() == SUCCESS) {
            sink.success(message.getData());
        }
        else {
            sink.error(message.getCause());
        }
    } else if (fluxSinkMap.containsKey(message.getStreamId())) {
        FluxSink sink = fluxSinkMap.get(message.getStreamId());
        if (message.getType() == NEXT) {
            sink.next(message.getData());
        }
        else if (message.getType() == COMPLETE) {
            fluxSinkMap.remove(message.getStreamId());
            sink.next(message.getData());
            sink.complete();
        }
        else {
            fluxSinkMap.remove(message.getStreamId());
            sink.error(message.getCause());
        }
    }
})

以上代码示例展示了如何路由传入的消息。

多路请求

最后一个部分是消息多路复用。为此,我们将涵盖可能的发送者类实现:

class Sender {
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...

    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;

    public Sender () {

//在此处创建WebSocket连接并放置先前提到的代码 }

    Mono<R> sendForMono(T data) {
        //generate message with unique 
        return Mono.<R>create(sink -> {
            monoSinksMap.put(streamId, sink);
            outgoing.onNext(message); // send message to server only when subscribed to Mono
        });
    }

     Flux<R> sendForFlux(T data) {
         return Flux.<R>create(sink -> {
            fluxSinksMap.put(streamId, sink);
            outgoing.onNext(message); // send message to server only when subscribed to Flux
        });
     }
}

自定义实现总结

  1. 困难重重
  2. 没有支持反压的实现,这可能是另一个挑战
  3. 很容易自己搞砸

要点

  1. 请使用RSocket,不要自己发明协议,这太难了!!!
  2. 想要了解更多关于RSocket的知识,请参考Pivotal团队的演讲视频
  3. 想要了解更多关于RSocket的知识,请参考我的一次演讲视频
  4. 有一个基于RSocket开发的特色框架叫做Proteus - 你可能会对它感兴趣 - https://www.netifi.com/
  5. 想要了解更多关于Proteus的知识,请参考RSocket协议的核心开发者的演讲视频

1
我刚开始学习WebSockets,很喜欢你的回答。你了解STOMP吗?Spring-integration自带了STOMP,你觉得这和使用RSocket相比如何呢(如果这有任何意义的话)? - Rüdiger Schulz
4
@RüdigerSchulz 最后一次STOMP更新(根据规范)是在7年前……确实不太好。另一方面,Java客户端/服务器的维护也已经过时。我建议看一下RSocket。现在,Spring团队正在完成RSocket和Spring-Messaging集成的工作,所以几个月后它将公开发布。更多信息请查看 -> https://github.com/spring-projects/spring-framework/tree/master/spring-messaging/src/main/java/org/springframework/messaging/rsocket - Oleh Dokuka
如果我有一个React Native客户端应用程序,我可以使用这个协议吗?我想知道,因为我无法连接到服务器,而且通过WebSocket一切都运行良好,当然路由不行,我不太能够实现它。 - Gergo

3

不确定这是否是您的问题?? 我看到您正在发送静态flux响应(这是可关闭的流) 您需要打开的流以向该会话发送消息,例如,您可以创建一个处理器

public class SocketMessageComponent {
private DirectProcessor<String> emitterProcessor;
private Flux<String> subscriber;

public SocketMessageComponent() {
    emitterProcessor = DirectProcessor.create();
    subscriber = emitterProcessor.share();
}

public Flux<String> getSubscriber() {
    return subscriber;
}

public void sendMessage(String mesage) {
    emitterProcessor.onNext(mesage);
}

}

然后你可以发送

 public Mono<Void> handle(WebSocketSession webSocketSession) {
    this.webSocketSession = webSocketSession;
    return webSocketSession.send(socketMessageComponent.getSubscriber()
            .map(webSocketSession::textMessage))
            .and(webSocketSession.receive()
                    .map(WebSocketMessage::getPayloadAsText).log());
}

嗨,感谢您的回复。我已经尝试过了,但有没有办法区分服务器发送的响应实体,例如fetchByIDfetchAll之间的区别。当您尝试利用同一连接/会话进行多个请求时,如何区分这两者? - Mritunjay
所以你需要发出不同类型的对象? - Ricard Kollcaku
1
抱歉回复晚了。如问题所述,我们正在尝试设计一个WebSocket服务器,其中包含通常的HTTP服务器操作,例如“创建/获取/更新”。使用WebSocket,我们试图公开一个端点,以便客户端可以利用单个连接进行各种操作,因为WebSocket就是为此目的而设计的。在Webflux和WebSocket中使用这种设计是否正确? - Mritunjay
我不认为websocket是最好的解决方案。Socket更适合实时数据。如果你把REST的工作交给socket,那么使用socket会变成一种混乱的代码。 - Ricard Kollcaku
感谢您的回复。 - Mritunjay

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接