Netty - 客户端/服务器聊天

5

我需要在我的项目中使用netty进行客户端/服务器通信。因此,我刚开始进行了一些实践以提高熟练度。我正在学习netty,但我是一个初学者。

我尝试过使用netty进行简单的客户端和服务器聊天。

客户端和服务器都已初始化,我可以看到服务器能够获取用于连接建立的客户端管道,但当客户端发送消息时,它却没有进入ServerAdapterHandler的messageReceived部分。以下是我的源代码:

客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class ContainerClient {

    String server;
    int port;
    int containerPort;

    public ContainerClient(String server, int port, int containerPort) {
        this.server = server;
        this.port = port;
        this.containerPort = containerPort;
    }

    public static void main(String[] args) {
        String server = "localhost";
        int port = 5252;
        int containerPort = 8094;
        new ContainerClient(server, port, containerPort).start();
    }

    public void start() {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap().group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientAdapterInitializer());

            Channel channel = bootstrap.connect(server, port).sync().channel();

            channel.write("Hi\n");
            channel.write("Hi\n");
            channel.write("Hi\n");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端通道初始化器:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ClientAdapterInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        pipeline.addLast("handler", new ClientAdapterHandler());
    }

}

客户端消息处理程序:

 import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;

public class ClientAdapterHandler extends
        ChannelInboundMessageHandlerAdapter<String> {

    @Override
    public void messageReceived(ChannelHandlerContext context, String message)
            throws Exception {
        System.out.println(message);
        if (message.equals("quit"))
            throw new ServerEndedException("Server is closed");
    }

    @Override
    public void channelRead(ChannelHandlerContext arg0, Object arg1)
            throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext arg0)
            throws Exception {
        // TODO Auto-generated method stub

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext arg0)
            throws Exception {
        // TODO Auto-generated method stub

    }

}

服务器:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ContainerServer {

    int port;

    public static void main(String[] args) {
        new ContainerServer().start();
    }

    public void start() {
        port = 5252;
        EventLoopGroup producer = new NioEventLoopGroup();
        EventLoopGroup consumer = new NioEventLoopGroup();

        try {

            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(producer, consumer)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerAdapterInitializer());
            System.out.println("Server started");
            bootstrap.bind(port).sync().channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdownGracefully();
            consumer.shutdownGracefully();
        }

    }

}

服务器通道初始化程序:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ServerAdapterInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        pipeline.addLast("handler", new ServerAdapterHandler());
    }

}

服务器消息处理程序:

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class ServerAdapterHandler extends
        ChannelInboundMessageHandlerAdapter<String> {

    private static final ChannelGroup channels = new DefaultChannelGroup(
            "containers", GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[START] New Container has been initialzed");
        channels.add(ctx.channel());
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("[END] A Container has been removed");
        channels.remove(ctx.channel());
        super.handlerRemoved(ctx);
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, String arg1)
            throws Exception {
        Channel currentChannel = ctx.channel();
        System.out.println("[INFO] - " + currentChannel.remoteAddress() + " - "
                + arg1);
        currentChannel.write("[Server] - Success");

    }

    @Override
    public boolean beginMessageReceived(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("Message received");
        return super.beginMessageReceived(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext arg0, Object arg1)
            throws Exception {
        System.out.println("channelRead");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext arg0)
            throws Exception {
        // TODO Auto-generated method stub
        System.out.println("channelReadComplete");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext arg0)
            throws Exception {
        // TODO Auto-generated method stub
        System.out.println("channelWritabilityChanged");
    }

}

以下是我在服务器中得到的输出,而客户端中什么都没有:
Server started
[START] New Container has been initialzed
channelReadComplete
[END] A Container has been removed

但是预期的结果应该是:
Server started
[START] New Container has been initialzed
channelReadComplete
[INFO] - localhost - Hi
[INFO] - localhost - Hi
[INFO] - localhost - Hi
[END] A Container has been removed

我应该在客户端收到以下响应:

[Server] - Success
[Server] - Success
[Server] - Success

我在Framer中尝试使用行分隔符,但结果相同。

有人能帮忙解决吗?

谢谢提前!


请创建你的代码的 Minimal, Complete, and Verifiable example 版本。 - Lrrr
1
对此我感到抱歉。尽管我已经尝试了,但我不确定错误在哪里,这就是为什么我提供了所有我的类的原因。 - Praveen
你使用的Netty版本是什么?我猜测你需要在写入后刷新通道。 - Moh-Aw
是的,我也试过了。但是没有用。问题在于,我的客户端的第一条消息本身就无法到达服务器。否则,您是否有更好的关于io.netty的博客文章?我正在使用netty 4.0.0。 - Praveen
1个回答

2

我在你的 ContainerClient 中的 start 方法中做了以下更改,现在对我有效了。只需添加一个 channel.flush()

public void start() {
    EventLoopGroup group = new NioEventLoopGroup();

    try {
        Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ClientAdapterInitializer());

        Channel channel = bootstrap.connect(server, port).sync().channel();

        channel.write("Hi\n");
        channel.write("Hi\n");
        channel.write("Hi\n");
        channel.flush();

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        group.shutdownGracefully();
    }
}

ChannelInboundMessageHandlerAdapter在4.0的新版本中已经不存在了,我使用了SimpleChannelInboundHandler

public class ServerAdapterHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel currentChannel = ctx.channel();
        System.out.println("[INFO] - " + currentChannel.remoteAddress() + " - " + msg);
        currentChannel.write("[Server] - Success");
    }

}

看起来当客户端发送消息时,我会立即关闭该组。如果我想要服务器的回复,我可能需要在那里使用循环或休眠来等待服务器响应。 - Praveen
如果您想保持通道活动,您需要将shutdownGracefully()调用放在一个单独的方法中,并在需要关闭通道时调用它。 - Moh-Aw

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接