Jersey/JAX-RS 2异步响应 - 如何跟踪当前长轮询的调用方

3
我的目标是支持多个Web服务调用方的长轮询,并跟踪当前“停放”在长轮询上的调用方(即已连接)。通过“长轮询”,我指的是调用方调用Web服务,服务器(Web服务)不立即返回,而是让调用方等待一段预设时间(在我的应用程序中为一小时),或者如果服务器有消息要发送给调用方,则更早地返回(在这种情况下,服务器通过调用asyncResponse.resume(“MESSAGE”)返回消息)。
我将把它分成两个问题。
第一个问题:这是“停放”正在进行长轮询的调用方的合理方法吗?
@GET
@Produces(MediaType.TEXT_PLAIN)
@ManagedAsync
@Path("/poll/{id}")
public Response poller(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String callerId) {

    // add this asyncResponse to a HashMap that is persisted across web service calls by Jersey.
    // other application components that may have a message to send to a caller will look up the
    // caller by callerId in this HashMap and call resume() on its asyncResponse.
    callerIdAsyncResponseHashMap.put(callerId, asyncResponse);

    asyncResponse.setTimeout(3600, TimeUnit.SECONDS);
    asyncResponse.setTimeoutHandler(new TimeoutHandler() {
        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.resume(Response.ok("TIMEOUT").build());
        }
    });
    return Response.ok("COMPLETE").build();
}

这段代码可以正常运行,只是我不确定它是否遵循最佳实践。在方法末尾出现“return Response...”这一行似乎很奇怪。虽然该行代码在调用者第一次连接时执行,但据我所知,实际上从未将“COMPLETE”结果返回给调用者。当服务器需要通知调用者发生事件时,调用者会通过asyncResponse.resume()接收到“TIMEOUT”响应或其他响应消息。
第二个问题:我的当前挑战是准确反映HashMap中当前轮询的调用者数量。当一个调用者停止轮询时,我需要从HashMap中删除它的条目。有三种原因导致调用者离开:1)3600秒已过,因此超时;2)另一个应用程序组件查找HashMap中的调用者并调用asyncResponse.resume("MESSAGE");3)HTTP连接由于某些原因(例如关闭运行客户端应用程序的计算机)而中断。
因此,JAX-RS有两个回调函数可注册以通知连接结束:CompletionCallback(用于我的终止轮询原因#1和#2),以及ConnectionCallback(用于我的终止轮询原因#3)。我可以像这样将它们添加到我的Web服务方法中:
@GET
@Produces(MediaType.TEXT_PLAIN)
@ManagedAsync
@Path("/poll/{id}")
public Response poller(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String callerId) {

    asyncResponse.register(new CompletionCallback() {
        @Override
        public void onComplete(Throwable throwable) {
            //?
        }
    });

    asyncResponse.register(new ConnectionCallback() {
        @Override
        public void onDisconnect(AsyncResponse disconnected) {
            //?
        }
    });

    // add this asyncResponse to a HashMap that is persisted across web service calls by Jersey.
    // other application components that may have a message to send to a caller will look up the
    // caller by callerId in this HashMap and call resume() on its asyncResponse.
    callerIdAsyncResponseHashMap.put(callerId, asyncResponse);

    asyncResponse.setTimeout(3600, TimeUnit.SECONDS);
    asyncResponse.setTimeoutHandler(new TimeoutHandler() {
        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.resume(Response.ok("TIMEOUT").build());
        }
    });
    return Response.ok("COMPLETE").build();
}

挑战在于利用这两个回调函数从HashMap中删除不再轮询的调用者。其中ConnectionCallback实际上更容易处理。由于它接收一个asyncResponse实例作为参数,我可以使用它来从HashMap中删除相应的条目,如下所示:

asyncResponse.register(new ConnectionCallback() {
    @Override
    public void onDisconnect(AsyncResponse disconnected) {
        Iterator<Map.Entry<String, AsyncResponse>> iterator = callerIdAsyncResponseHashMap.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, AsyncResponse> entry = iterator.next();
            if (entry.getValue().equals(disconnected)) {
                iterator.remove();
                break;
            }
        }
    }
});

对于CompletionCallback而言,在回调被触发时,由于asyncResponse已经完成或取消了,因此不会传递asyncResponse参数。因此,似乎唯一的解决方案是遍历HashMap条目,检查是否已完成/取消,并将其删除,如下所示。(请注意,我不需要知道调用者是因为调用了resume()还是超时而离开,因此不看"throwable"参数)。

asyncResponse.register(new CompletionCallback() {
    @Override
    public void onComplete(Throwable throwable) {
        Iterator<Map.Entry<String, AsyncResponse>> iterator = callerIdAsyncResponseHashMap.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, AsyncResponse> entry = iterator.next();
            if (entry.getValue().isDone() || entry.getValue().isCancelled()) {
                iterator.remove();
            }
        }
    }
});

欢迎提供任何反馈意见。这种方法看起来合理吗?是否有更好的Jersey/JAX-RS方法来实现它?

1个回答

0

您的poller()方法不需要返回Response就可以参与异步处理。它可以返回void。但是,如果您在poller中执行任何复杂操作,则应考虑将整个方法包装在try/catch块中,并使用异常恢复AsyncResponse对象,以确保不会丢失任何RuntimeExceptions或其他未经检查的Throwables。在此处的catch块中记录这些异常似乎也是一个好主意。

我目前正在研究如何可靠地捕获客户端取消异步请求的问题,并阅读了一个问题,该问题表明机制对于提问者并不起作用[1]。我将让其他人暂时填写此信息。

[1] Jersey中的AsyncResponse ConnectionCallback不触发


感谢你的输入。是的,你给出链接的另一个问题也是我发起的 :) 顺便说一下,由于ConnectionCallback没有触发的问题,我刚刚从长轮询转移到了服务器发送事件(SSE)。似乎对我们来说更合适。我们拭目以待。 - ricb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接