使用Jersey SSE进行广播:检测连接关闭

9
我认为这个问题不是Server sent event with Jersey: EventOutput is not closed after client drops的重复,但可能与Jersey Server-Sent Events - write to broken connection does not throw exception相关。

在Jersey文档的第15.4.2章节中,描述了SseBroadcaster:

然而,SseBroadcaster内部也识别并处理客户端断开连接。当客户端关闭连接时,广播器检测到这一情况,并将陈旧的连接从已注册的EventOutputs的内部集合中删除,同时释放与陈旧连接相关联的所有服务器端资源。

我无法确认这一点。在以下测试用例中,我看到子类化的SseBroadcasteronClose()方法从未被调用:不会在EventInput关闭时,也不会在广播另一条消息时。

public class NotificationsResourceTest extends JerseyTest {
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class);

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster();

    public static class CountingSseBroadcaster extends SseBroadcaster { 
        final AtomicInteger connectionCounter = new AtomicInteger(0);

        public EventOutput createAndAttachEventOutput() {
            EventOutput output = new EventOutput();
            if (add(output)) {
                int cons = connectionCounter.incrementAndGet();
                log.debug("Active connection count: "+ cons);
            }
            return output;
        }

        @Override
        public void onClose(final ChunkedOutput<OutboundEvent> output) {
            int cons = connectionCounter.decrementAndGet();
            log.debug("A connection has been closed. Active connection count: "+ cons);
        }

        @Override
        public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) {
            log.trace("An exception has been detected", exception);
        }

        public int getConnectionCount() {
            return connectionCounter.get();
        }
    }

    @Path("notifications")
    public static class NotificationsResource {

        @GET
        @Produces(SseFeature.SERVER_SENT_EVENTS)
        public EventOutput subscribe() {
            log.debug("New stream subscription");

            EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
            return eventOutput;
        }
    }   

    @Override
    protected Application configure() {
        ResourceConfig config = new ResourceConfig(NotificationsResource.class);
        config.register(SseFeature.class);

        return config;
    }


    @Test
    public void test() throws Exception {
        // check that there are no connections
        assertEquals(0, broadcaster.getConnectionCount());

        // connect subscriber
        log.info("Connecting subscriber");
        EventInput eventInput = target("notifications").request().get(EventInput.class);
        assertFalse(eventInput.isClosed());

        // now there are connections
        assertEquals(1, broadcaster.getConnectionCount());

        // push data
        log.info("Broadcasting data");
        String payload = UUID.randomUUID().toString();
        OutboundEvent chunk = new OutboundEvent.Builder()
                .mediaType(MediaType.TEXT_PLAIN_TYPE)
                .name("message")
                .data(payload)
                .build();
        broadcaster.broadcast(chunk);

        // read data
        log.info("Reading data");
        InboundEvent inboundEvent = eventInput.read();
        assertNotNull(inboundEvent);
        assertEquals(payload, inboundEvent.readData());

        // close subscription 
        log.info("Closing subscription");
        eventInput.close();
        assertTrue(eventInput.isClosed());

        // at this point, the subscriber has disconnected itself, 
        // but jersey doesnt realise that
        assertEquals(1, broadcaster.getConnectionCount());

        // wait, give TCP a chance to close the connection
        log.debug("Sleeping for some time");
        Thread.sleep(10000);

        // push data again, this should really flush out the not-connected client
        log.info("Broadcasting data again");
        broadcaster.broadcast(chunk);
        Thread.sleep(100);

        // there is no subscriber anymore
        assertEquals(0, broadcaster.getConnectionCount());  // FAILS!
    }
}

也许JerseyTest不是测试这个的好方法。在一个不那么严格的环境下,使用JavaScript EventSource时,我看到onClose()被调用,但只有在先前关闭连接后广播消息之后才会发生。
我做错了什么?
为什么SseBroadcaster无法检测到客户端关闭连接?
跟进:
我找到了JERSEY-2833,它因“按设计工作”而被拒绝:
根据SSE章节中的Jersey文档(https://jersey.java.net/documentation/latest/sse.html)中15.4.1的说明,Jersey不会明确地关闭连接,这是资源方法或客户端的责任。
那是什么意思?资源是否应强制执行超时并结束所有活动和由客户端关闭的连接?
3个回答

2
在构造函数org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster()的文档中,它说:
创建一个新实例。如果该构造函数由子类调用,则假定子类存在的原因是实现onClose(org.glassfish.jersey.server.ChunkedOutput)onException(org.glassfish.jersey.server.ChunkedOutput, Exception)方法,因此将新创建的实例添加为侦听器。为了避免这种情况,子类可以调用SseBroadcaster(Class)并传递它们的类作为参数。
所以你不应该保留默认构造函数,应试着使用你的构造函数调用超类:
```java public MySseBroadcaster() { super(MySseBroadcaster.class); } ```
public CountingSseBroadcaster(){
    super(CountingSseBroadcaster.class);
}

0

我认为最好在你的资源上设置一个超时时间,然后只关闭那个连接,例如:

@Path("notifications")
public static class NotificationsResource {

    @GET
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput subscribe() {
        log.debug("New stream subscription");

        EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
        new Timer().schedule( new TimerTask()
        {
            @Override public void run()
            {
               eventOutput.close()
            }
        }, 10000); // 10 second timeout
        return eventOutput;
    }
}   

感谢您的输入。我相信这种方法可以行得通,并且它有其优点,但这不是我想要的。我目前的解决方法是每隔x秒发送一个SSE注释,这将最终清除断开的连接。然而,原始问题仍然存在-为什么Jersey不能检测到关闭的连接并自行报告呢? - Hank

0

我在想,通过子类化是否可能改变了行为。

    @Override
    public void onClose(final ChunkedOutput<OutboundEvent> output) {
        int cons = connectionCounter.decrementAndGet();
        log.debug("A connection has been closed. Active connection count: "+ cons);
    }

在这里,您没有关闭ChunkedOutput,因此它不会释放连接。这可能是问题所在吗?

不,那与此无关。事实上,情况恰恰相反,当广播器关闭“OutboundEvent”时,“onClose()”被调用。 - Hank

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接