如何使用Netty客户端获取服务器响应

16

我想编写一个基于Netty的客户端。它应该有一个方法public String send(String msg);,该方法应返回来自服务器或某个未来的响应 - 不重要。此外,它应该是多线程的。像这样:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

我不明白在调用writeAndFlush()之后如何从服务器检索响应,我该怎么做?

另外,我使用的是Netty 4.0.18.Final版本。

4个回答

16

返回一个 Future<String> 对象很简单,我们将实现以下方法签名:

public Futute<String> sendMessage(String msg) {

如果您熟悉异步编程结构,那么这个问题相对容易解决。为了解决设计问题,我们将采取以下步骤:

  1. 当写入消息时,向 ArrayBlockingQueue<Promise> 添加一个 Promise<String>

    这将作为最近发送的消息列表,并允许我们更改返回结果的 Future<String> 对象。

  2. 当消息返回到处理程序时,将其与 Queue 的头部进行解析

    这使我们可以获取正确的 future 来进行更改。

  3. 更新 Promise<String> 的状态

    我们调用 promise.setSuccess() 最终在对象上设置状态,这将传播回 future 对象。

示例代码

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.executor().newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}

文档细节

  • private ChannelHandlerContext ctx;

    用于存储我们对ChannelHandlerContext的引用,这样我们就可以创建Promise。

  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();

    我们将过去的消息保存在此列表中,以便我们可以更改未来的结果。

  • public void channelActive(ChannelHandlerContext ctx)

    当连接变为活动状态时,由Netty调用。在此处初始化我们的变量。

  • public void channelInactive(ChannelHandlerContext ctx)

    当连接变为非活动状态时,由Netty调用,可能是由于错误或正常连接关闭。

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    当新消息到达时,由Netty调用,在此处挑选队列的头部,然后我们调用它的setsuccess。

警告建议

当使用Future时,有一件事情需要注意,如果Future还没有完成,请不要从Netty线程中调用get()方法。不遵循这个简单的规则将会导致死锁或者BlockingOperationException


4
有两个注意事项:1)所使用的协议必须保证服务器按照接收请求的顺序发送响应;2)只能向单个服务器发送和接收请求(否则,由于各个服务器之间的排序可能不再同步,第一个注意事项将会出现问题)。由于单个引导程序可以用于连接多个服务器,因此第二个注意事项可能是一个问题,尽管每个连接将产生自己的通道,因此假定(1)成立,应该可以为每个通道设置单独的队列以解决这个问题。 - Jonathan
内存可见性怎么样——在channelActive中分配ctx是否保证被调用sendMessage(String)的线程看到? - dnault
还有其他人遇到这个错误吗?在 return sendMessage(message, ctx.newPromise()); 中,newPromise 的类型是 io.netty.channel.ChannelPromise,但所需的类型是 io.netty.util.concurrent.Promise<String>。经过一些强制转换后,我得到了 ClassCastException: java.lang.String cannot be cast to java.lang.Void - Dawid Fieluba
我在return sendMessage(message, ctx.newPromise());这一行也遇到了相同的编译错误。 - mertaksu
@mertaksu 应该是 ctx.executor().newPromise() 而不是 ctx.newPromise(),我不确定旧代码第一次是如何编译的,可能是我使用了不同版本的Netty。 - Ferrybig
看起来是一个不错的答案,但我没有看到 ctx.writeAndFlush(message).addListener(); 实际上有什么作用(也许自从这篇文章写出来以后 API 就改变了,但是没有空的 addListener() 方法,而且这个分支对 Promise 没有任何影响。 :-( - cjstehno

4
你可以在netty项目中找到示例。 我们可以将结果保存到最后一个handler的自定义字段中。在下面的代码中,我们需要的是handler.getFactorial()。
参考http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all FactorialClient.java
public final class FactorialClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
    static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new FactorialClientInitializer(sslCtx));

            // Make a new connection.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Get the handler instance to retrieve the answer.
            FactorialClientHandler handler =
                (FactorialClientHandler) f.channel().pipeline().last();

            // Print out the answer.
            System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
        } finally {
            group.shutdownGracefully();
        }
    }
}

public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {

    private ChannelHandlerContext ctx;
    private int receivedMessages;
    private int next = 1;
    final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();

    public BigInteger getFactorial() {
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    return answer.take();
                } catch (InterruptedException ignore) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        sendNumbers();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
        receivedMessages ++;
        if (receivedMessages == FactorialClient.COUNT) {
            // Offer the answer after closing the connection.
            ctx.channel().close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    boolean offered = answer.offer(msg);
                    assert offered;
                }
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private void sendNumbers() {
        // Do not send more than 4096 numbers.
        ChannelFuture future = null;
        for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
            future = ctx.write(Integer.valueOf(next));
            next++;
        }
        if (next <= FactorialClient.COUNT) {
            assert future != null;
            future.addListener(numberSender);
        }
        ctx.flush();
    }

    private final ChannelFutureListener numberSender = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                sendNumbers();
            } else {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    };
}

0

调用channel.writeAndFlush(msg);已经返回了一个ChannelFuture。为了处理这个方法调用的结果,你可以像这样向future添加一个监听器:

future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
        // Perform post-closure operation
        // ...
    }
}); 

(这是从Netty文档中提取的,请参见:Netty文档


3
如何从ChannelFuture获取服务器响应? - Moses
你需要向通道注册一个ChannelInboundHandler。实际上,你可能已经做过了 -> 请查看你的ClientHandler。这个处理器可以实现一个public void channelRead(ChannelHandlerContext ctx, Object msg) {...}方法。它处理来自服务器的响应。 - Teots
1
我了解Netty的基础知识。在服务器上,它很简单明了。但我仍然不明白如何将这段代码连接起来:public String sendMessage(String msg) { channel.writeAndFlush(msg); return ??????????; } 与channelRead(...)。 - Moses
在Netty中,无法通过此方法获取服务器响应。数据(在您的情况下为来自服务器的响应)由连接到通道的InboundHandlers处理。在这些处理程序中,您可以将服务器响应转发到代码的另一部分。始终记住,Netty是一个异步框架! - Teots

0

这里有另一个解决方案,你只需要熟悉Netty使用的异步编程。

下面的解决方案主要使用子Netty通道和LinkedBlockingQueue。

在你的入站处理程序中,

@ChannelHandler.Sharable
public class ClientInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        Attribute<SensibleRelay> relayAttr = channel.attr(ChannelAttributeKeys.RELAY);
        if (null == relayAttr) {
            return;
        }

        FullHttpResponse httpResponse = (FullHttpResponse) msg;
        ByteBuf content = httpResponse.content();
      
        SensibleRelay relay = relayAttr.get();
        boolean offered = relay.offerResponse(content.toString(StandardCharsets.UTF_8));
        assert offered;
    }
}

在你的Netty客户端中,
SensibleRelay relay = new SensibleRelay();
future.addListener(new FutureListener<Channel>() {
    @Override
    public void operationComplete(Future<Channel> f) throws Exception {
        if (f.isSuccess()) {
            Channel channel = f.getNow();
            
            channel.attr(ChannelAttributeKeys.RELAY).set(relay);

            channel.writeAndFlush(request);
        } 
    }
});

return relay.takeResponse();

这里是 SensibleRelay

public class SensibleRelay {

    final BlockingQueue<String> answer = new LinkedBlockingQueue<String>(1);

    public String takeResponse() {
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    return answer.take();
                } catch (InterruptedException ignore) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public boolean offerResponse(String response) {
        return answer.offer(response);
    }

}

希望这能帮到你。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接