如何获取现有的Websocket实例

15

我正在开发一个应用程序,使用Websockets(Java EE 7)异步地向所有连接的客户端发送消息。每当创建新文章(我的应用中的一种参与模式)时,服务器(Websocket端点)都应该发送这些消息。

每次建立到Websocket端点的连接时,我都将相应的会话添加到一个列表中,以便在外部可以访问。

但我遇到的问题是,当我从外部访问连接到此端点的所有客户端的已创建Websocket端点时(任何其他业务类),我会得到现有实例(类似于单例)。

所以,请您建议我一种获取Websocket端点现有实例的方法,因为我无法像创建new MyWebsocketEndPoint()一样创建它,因为它将由Websocket内部机制在接收来自客户端的请求时创建。

供参考:

private static WebSocketEndPoint INSTANCE = null;

public static WebSocketEndPoint getInstance() {
if(INSTANCE == null) {
// Instead of creating a new instance, I need an existing one
    INSTANCE = new WebSocketEndPoint ();
}
        return INSTANCE;
}

提前致谢。

2个回答

23

容器为每个客户端连接创建了单独的端点实例,因此您无法做您试图做的事情。但我想您试图做的是在事件发生时向所有活动客户端连接发送消息,这很简单。

javax.websocket.Session类具有getBasicRemote方法,可检索表示与该会话关联的端点的RemoteEndpoint.Basic实例。

您可以通过调用Session.getOpenSessions()来检索所有打开的会话,然后遍历它们。循环将向每个客户端连接发送一条消息。以下是一个简单的示例:

@ServerEndpoint("/myendpoint")
public class MyEndpoint {
  @OnMessage
  public void onMessage(Session session, String message) {
    try {  
      for (Session s : session.getOpenSessions()) {
        if (s.isOpen()) {
          s.getBasicRemote().sendText(message);
        }
    } catch (IOException ex) { ... }
  } 
} 

但在你的情况下,你可能想使用CDI事件来触发对所有客户端的更新。在这种情况下,你需要创建一个CDI事件,然后在你的Websocket端点类中观察该方法:

@ServerEndpoint("/myendpoint")
public class MyEndpoint {
  // EJB that fires an event when a new article appears
  @EJB
  ArticleBean articleBean;
  // a collection containing all the sessions
  private static final Set<Session> sessions = 
          Collections.synchronizedSet(new HashSet<Session>());

  @OnOpen
  public void onOpen(final Session session) {
    // add the new session to the set
    sessions.add(session);
    ...
  }

  @OnClose
  public void onClose(final Session session) {
    // remove the session from the set
    sessions.remove(session);
  }

  public void broadcastArticle(@Observes @NewArticleEvent ArticleEvent articleEvent) {
    synchronized(sessions) {
      for (Session s : sessions) {
        if (s.isOpen()) {
          try {
            // send the article summary to all the connected clients
            s.getBasicRemote().sendText("New article up:" + articleEvent.getArticle().getSummary());
          } catch (IOException ex) { ... }
        }
      }
    }
  }
}

在上面的例子中,EJB 将会执行以下操作:

...
@Inject
Event<ArticleEvent> newArticleEvent;

public void publishArticle(Article article) {
  ...
  newArticleEvent.fire(new ArticleEvent(article));
  ...
}

请参阅Java EE 7教程中关于WebSocketsCDI事件的章节。

编辑:修改@Observer方法以使用事件作为参数。

编辑2:在broadcastArticle循环中使用synchronized进行了包装,按照@gcvt的建议。

编辑3:更新了Java EE 7教程的链接。做得好,Oracle。嘘。


上面的例子只是一个通用的架构。您不必使用EJB。在您的情况下,“ArticleHandler”将注入事件并触发它。至于访问会话,将“Set<Session>”集合移动到单例中,并从您的端点类调用该单例以将会话存储在那里。然后,单例将具有“getSessions()”方法来检索当前会话。 - Ian Evans
2
broadcastArticle 方法中,在 for 循环之前应该加上 synchronized(sessions) - gcvt
Java EE 7的链接是404。 - GoZoner
@GoZoner 修复了链接。 - Ian Evans
1
@DonRhummy 是的,假设您的容器支持CDI,您应该能够这样做。 - Ian Evans
显示剩余5条评论

9
实际上,WebSocket API 提供了一种控制端点实例化的方式。请参见 https://tyrus.java.net/apidocs/1.2.1/javax/websocket/server/ServerEndpointConfig.Configurator.html 简单示例(取自Tyrus - WebSocket RI测试):
    public static class MyServerConfigurator extends ServerEndpointConfig.Configurator {

        public static final MyEndpointAnnotated testEndpoint1 = new MyEndpointAnnotated();
        public static final MyEndpointProgrammatic testEndpoint2 = new MyEndpointProgrammatic();

        @Override
        public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
            if (endpointClass.equals(MyEndpointAnnotated.class)) {
                return (T) testEndpoint1;
            } else if (endpointClass.equals(MyEndpointProgrammatic.class)) {
                return (T) testEndpoint2;
            }

            throw new InstantiationException();
        }
    }

您需要将此注册到一个端点:
@ServerEndpoint(value = "/echoAnnotated", configurator = MyServerConfigurator.class)
public static class MyEndpointAnnotated {

    @OnMessage
    public String onMessage(String message) {

        assertEquals(MyServerConfigurator.testEndpoint1, this);

        return message;
    }
}

或者您也可以在编程端点中使用它:
public static class MyApplication implements ServerApplicationConfig {
    @Override
    public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> endpointClasses) {
        return new HashSet<ServerEndpointConfig>
          (Arrays.asList(ServerEndpointConfig.Builder
            .create(MyEndpointProgrammatic.class, "/echoProgrammatic")
            .configurator(new MyServerConfigurator())
            .build()));
    }

    @Override
    public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) {
        return new HashSet<Class<?>>(Arrays.asList(MyEndpointAnnotated.class));
    }

当然,你可以选择使用一个配置器用于所有端点(如所示片段中的丑陋if语句),或为每个端点创建单独的配置器。
请不要直接复制所示代码-这只是Tyrus测试的一部分,并且违反了一些基本的OOM范例。
请参见https://github.com/tyrus-project/tyrus/blob/1.2.1/tests/e2e/src/test/java/org/glassfish/tyrus/test/e2e/GetEndpointInstanceTest.java以获取完整的测试。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接