Netty客户端与服务器消息传输

11

这实际上是我在这里发布的第一篇文章,我已经试图弄清楚这个问题有一段时间了,但最终我还是决定寻求一些关于这个话题的帮助。

所以我有一个客户端和一个服务器,它们都是基于回显客户端/服务器和安全聊天客户端/服务器模型构建的。我对聊天的SSL部分不感兴趣,只使用回显来确保我能够从客户端/服务器接收到响应。我将在本文底部添加所有相关代码。我目前遇到的问题是,在客户端连接后,我可以从服务器向客户端发送消息,但无法在服务器发送初始消息给客户端后从客户端向服务器发送消息。服务器发送的消息为:

Welcome to the server!

客户端传输的信息为:
test

我应该知道我收到了来自客户端的消息,因为它应该回显

[You] test

我知道服务器可以看到客户端并向我发送状态更新,但出于某种原因我无法向服务器发送消息。现在有一个问题...如果你正在制作一个游戏(这就是我在做的事情),并且你将拥有像登录、玩家移动、世界更新等东西...那么发送字符串是最好的方法吗?我知道很多人使用字节流,在我的编程课程中我们也接触过操作字节流,但我仍然不完全熟悉它们。如果字节流是更好/最好的方法,那么可以有人详细解释一下如何操作字节流以处理不同的项目吗?
如前所述,这是客户端的开始。
public class Client {

public Client() {
    // Initialize the window
    GameWindow.init();
    // Initialize the server connection
    ClientHandler.init();
}

public static void main(String[] args) throws Exception {

    // Set a default server address if one isn't specified in the arguments
    if (args.length < 2 || args.length > 3) {
        System.err.println("Usage: " + Client.class.getSimpleName() + " <host> <port> [<first message size>]");
        System.err.println("Using default values.");
    } else {
        // Parse arguments
        Settings.host = args[0];
        Settings.port = Integer.parseInt(args[1]);
    }

    // start client
    new Client();
}

客户端处理器:

package simple.client.net;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import simple.client.Settings;

public class ClientHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(ClientHandler.class.getName());

public static Channel channel;

public ClientHandler() {
}

public static void init() {
    // Configure the client.
    ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

    // Set up the pipeline factory.
    bootstrap.setPipelineFactory(new ClientPipelineFactory());

    // Start the connection attempt.
    ChannelFuture future = bootstrap.connect(new InetSocketAddress(Settings.host, Settings.port));

    // Wait until the connection is closed or the connection attempt fails.
    channel = future.awaitUninterruptibly().getChannel();

    // This is where the test write is <<------
    ChannelFuture test = channel.write("test");

    if (!future.isSuccess()) {
        future.getCause().printStackTrace();
        bootstrap.releaseExternalResources();
        return;
    }
}

@Override
public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Bound: " + e.getChannel().isBound());
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Connected: " + e.getChannel().isConnected());
    System.out.println("Connected: " + e.getChannel().getRemoteAddress());
}

@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Closed: " + e.getChannel());
}

@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Disconnected: " + e.getChannel());
}

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Open: " + e.getChannel().isOpen());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    System.out.println("Error: " + e.getCause());
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    System.out.println("Message: " + e.getMessage());
}
}

最后是客户端管道(ClientPipeline):
package simple.client.net;

import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

public class ClientPipelineFactory implements ChannelPipelineFactory {

public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = pipeline();

    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("handler", new ClientHandler());

    return pipeline;
}

}

服务器端:

package simple.server;

public class Server {
public static void main(String[] args) throws Exception {
    ServerChannelHandler.init();
}
}

ServerChannelHandler:

package simple.server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.logging.Logger;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class ServerChannelHandler extends SimpleChannelHandler {

private static final Logger logger = Logger.getLogger(ServerChannelHandler.class.getName());

private static ChannelGroup channels;
private static ServerBootstrap bootstrap;

public ServerChannelHandler() {
}

/**
 * Initialize the Server Channel Handler
 */
public static void init() {
    // create a channels group to add incoming channels to
    channels = new DefaultChannelGroup();

    // create the server bootstrap (fancy word for pre-made server setup)
    bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
            Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

    // set the server pipeline factory
    bootstrap.setPipelineFactory(new ServerPipelineFactory());

    // server settings
    bootstrap.setOption("keepAlive", true);

    // bind the server to the port
    bootstrap.bind(new InetSocketAddress(Settings.PORT_ID));
}

@Override
public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Bound: " + e.getChannel());
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Connected: " + e.getChannel());
    channels.add(e.getChannel());
    e.getChannel().write("Welcome to the test server!\n\r");
}

@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Closed: " + e.getChannel());
}

@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Disconnected: " + e.getChannel());
}

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
    System.out.println("Open: " + e.getChannel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    System.out.println("Error: " + e.getCause());
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    System.out.println("Message: " + e.getMessage());
    for (Channel c : channels) {
        if (e.getMessage().equals("shutdown")) {
            shutdown();
        }
        if (c != e.getChannel()) {
            c.write("[" + e.getChannel().getRemoteAddress() + "] " + e.getMessage() + "\n\r");
        } else {
            c.write("[You] " + e.getMessage() + "\n\r");
        }
    }
}

/**
 * Shuts down the server safely
 */
public static final void shutdown() {
    channels.close();
    bootstrap.releaseExternalResources();
    System.exit(0);
}
}

ServerPipelineFactory:

package simple.server;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import simple.server.decoder.Decoder;
import simple.server.encoder.Encoder;

public class ServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = Channels.pipeline();

    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("handler", new ServerChannelHandler());

    return pipeline;
}
}

再次感谢大家帮助我理解这个问题。

非常抱歉,我不确定是否有人需要它,但我还应该补充一下,我可以通过Telnet本地主机45000连接到我的服务器,并且在那里它的表现符合预期..... 我正在编辑原帖以添加服务器代码。 - Maxs728
有人能帮我解决这个问题吗? - Maxs728
2个回答

5

你忘记在"test"后面添加\r\n了。应该是这样:channel.write("test\r\n")

从管道中可以看出,解码部分由两个处理程序组成。第一个将接收到的数据拆分并合并为单行字符串,并从中删除行尾。第二个将单行字符串转换为java.lang.String

在编码方面,只有一个处理程序,它将java.lang.String转换为ByteBuf,这就是它所做的全部工作。也许更好的做法是引入一个名为LineEncoderLineDecoderLineCodec的处理程序,它们执行通常期望的工作:https://github.com/netty/netty/issues/1811


1
这真是一个很愚蠢的回答,但具有讽刺意味的是它起作用了...哇,我不知道它对那些信息如此挑剔。 - Maxs728
我无法猜测这个。行为很奇怪,有什么原因吗? - Czechnology
已更新答案以回答评论中的问题。 - trustin

1

使用 new String("test") 更通用。回答你帖子后半部分 - 创建一个包含所有信息(如登录,玩家移动等)的类对象,并将其传递。确保你的类实现 Serializable 接口。将其作为字符串传递是一种不好的方式,因为我认为它将变得有点硬编码。客户端代码将如下所示:

ChannelPipeline p = ch.pipeline();
                        p.addLast(
                                new ObjectEncoder(),                            
                                new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
                                 new ClientHandler());              
                     }                  
                 });

                // Start the connection attempt.
            ChannelFuture f=  b.connect(host, port);
            channel=f.awaitUninterruptibly().channel();
            TestObj obj= new TestObj();
            channel.writeAndFlush(obj);

服务器端代码将如下所示:

 ChannelPipeline p = ch.pipeline();

                p.addLast(
                       new ObjectEncoder(),                         
                       new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
                        new DiscardServerHandler());
            }

服务器处理程序将是:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    System.out.println("channelRead"+((TestObj)msg).getCurrency());
}

这并没有回答问题。如果要批评或请求作者澄清,请在他们的帖子下留下评论 - 您始终可以在自己的帖子上发表评论,并且一旦您获得足够的 声望,您就可以评论任何帖子了。 - Achrome
我正在使用这个并且它很有效。使用ObjectEncoder和ObjectDecoder更适合他的用例,而不是使用String Decoder。那是他问题的一部分。尽管他的原始问题有一个被接受的答案,但我建议采用完全不同的方法。 - naves
如果您正在使用此功能并且它有效,请发布一段代码片段以进行说明。您当前的回答格式并没有真正帮助到我们。 - Achrome
强调我正在回答的问题部分:“如果你正在制作游戏(这就是我正在做的),并且你将拥有诸如登录、玩家移动、世界更新等功能...发送字符串是最好的方法吗?” - naves
我的代码有缺陷还是我误解了问题? - naves

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接