Netty 4 SSE 更新事件

5

我正在尝试创建一个基于Netty的服务器,以便在客户端上使用SSE规范。首先,我创建了一个扩展自SimpleChannelInboundHandler的处理程序(NotifyHandler),并从我的自有Pub系统进行扩展。当通知到达onNotificationRecibed时,它会被写入上下文输出通道。

private ChannelHandlerContext context = null;
private Publisher p = null;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    super.channelRead(ctx, msg);
    this.context = ctx;
    HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
            HttpResponseStatus.OK);
    HttpHeaders headers = response.headers();
    headers.set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
    headers.set(HttpHeaders.Names.CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
    headers.set(HttpHeaders.Names.PRAGMA, HttpHeaders.Values.NO_CACHE);
    headers.set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
    ctx.writeAndFlush(response);
    Pub.getInstance().suscribe(this);
}

@Override
public void onNotificationRecibed(String type, Map<String, Object> data) {
    context.writeAndFlush("event:"+type);
    context.writeAndFlush("data:"+data.toString());
    context.flush();
}

在初始化器中:

public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new HttpRequestDecoder());
    pipeline.addLast(new HttpResponseEncoder());
    pipeline.addLast(new NotifyHandler());
}

我无法使其正常工作,已经尝试查找一些有关该流的示例或用法,但似乎都不起作用。是否有人能指导我正确的方向?对于我的英语表达,十分抱歉。感谢您的时间。


你能否提供更多关于“不起作用”的细节?同时,你应该向writeAndFlush()返回的ChannelFuture添加一个ChannelFutureListener,并查看是否失败。 - Norman Maurer
谢谢Norman,问题在于客户端没有接收到任何数据。我会尝试使用ChannelFutureListener并告诉你我的结果。 - tbp
2个回答

2
除了 @MPazik 的建议外,由于您正在使用 HttpResponseEncoder,您需要将 HttpContent 写入到 ChannelHandlerContext 中。例如:
@Override
public void onNotificationRecibed(String type, Map<String, Object> data) {

    final StringBuilder msg = new StringBuilder(1024); // 1kb
    msg.append("event:").append(type).append("\n");
    msg.append"data:").append(data.toString().append("\n\n"));

    final ByteBuf buffer = Unpooled.copiedBuffer(msg.toString(), StandardCharsets.UTF_8);

    context.writeAndFlush(new DefaultHttpContent(buffer));    
}

所有上述的类都是标准JDK或Netty提供的。

1

我曾经遇到过同样的问题。正如规范中所描述的那样,每个字段都必须用新行分隔,每个消息之间也要用另一条新行分隔。

@Override
public void onNotificationRecibed(String type, Map<String, Object> data) {
    context.writeAndFlush("event:" + type + "\n");
    context.writeAndFlush("data:" + data.toString() + "\n\n");
    context.flush();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接