向入站通道处理程序管道中注入异步合成的消息

4
我有一个netty客户端从TCP连接中读取消息,并希望将合成消息添加到管道中,以便像普通消息一样由下游处理程序处理。
这些消息将使用定期计时器每秒注入一次,即这些消息与传入消息异步,因此我必须手动调用管道,而不是等待tcp流量,但肯定需要同步,以便管道处理程序不会并发调用。
在下面的简单示例管道中,消息必须在1之后插入(否则帧解码器会感到困惑),但在2之前插入(因为它应该处理合成消息以及常规消息)。

sample pipeline

如何完成这个任务?
1个回答

3
您可以通过获取要注入合成消息的上游处理程序的ChannelHandlerContext的引用来实现此操作。当您想要插入消息时,调用上下文的fireChannelRead(Object)方法,该方法会调用管道中下一个入站处理程序的channelRead方法。
Netty将确保在正确的线程中处理消息。(如果您已经在通道的事件循环线程中,则立即调用下一个处理程序;否则,在下一个可用机会时将调度调用以在通道的事件循环线程上发生)。
在您的情况下,您需要frame-decoder处理程序的上下文。 ChannelPipeline有一些重载的“context”方法,返回处理程序的上下文。在此示例中,假设您已经给处理程序命名为“frame-decoder”,我们将使用这个名称查找上下文。(或者,您可以通过传递处理程序对象本身的引用或传递处理程序的Class来查找它。)
public static void injectMessage(ChannelPipeline pipeline,
                                 Object message) {
    ChannelHandlerContext ctx = pipeline.context("frame-decoder");
    ctx.fireChannelRead(message);
}

1
在仔细查看了Netty代码之后,很明显Netty会在正确的线程中触发“channel read”事件(使用非常类似于此答案初始版本的代码),因此您不需要担心这一部分。 - dnault
1
原来你甚至不需要创建自定义处理程序,因为你可以从管道中获取任何处理程序的上下文。 - dnault
运行得非常好。感谢您的跟进和简化 - 它也简化了我的实际代码! - Evgeniy Berezovsky

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接