Netty与阻塞式套接字性能比较

6
我有一个简单的独立客户端-服务器测试,其中客户端向服务器发送500字节,并且服务器以2000字节的响应返回。它在循环中运行,每50000个此类请求/响应调用打印一次时间。我基于阻塞套接字API、Netty和NIO2实现了三种不同的方案,对它们的性能进行比较。测试表明,阻塞套接字的性能显著优于Netty或NIO2。虽然我知道这个测试没有并发处理(而NIO2正是为并发处理而设计的),但这种性能差异是否有解释?或者我做得非常低效?是否有任何方法可以改进基于Netty的代码,从而实现接近阻塞套接字的性能?我尝试使用直接缓冲区进行读取,但没有明显的差异。

测试是在1.7.0_55版本的Java上,在两台Linux服务器上进行的,它们连接在一个千兆网络上。以下是这些测试的前四个读数的结果(以毫秒为单位):

  • 阻塞:9754、9307、9305
  • Netty:14879、11872、11781
  • NIO2:14474、12117、12149

另一个谜团是,Netty和NIO2的实现在开始时运行缓慢,然后稳定下来。在Netty的情况下,稳定是在大约10000个周期之后发生的。

以下是源代码。

Config.java-由三种实现共同使用

public class Config {
    static final String HOST = "192.168.1.121";
    static final int PORT = 10000;

    static int requestLength = 500;
    static int responseLength = 2000;
    static int numOfCalls = 50000;

    static byte[] request = new byte[requestLength];
    static byte[] response = new byte[responseLength];
}

BlockingClient.java

public class BlockingClient {

    public static void main(String[] args) {
        Socket socket = null;
        try {
            socket = new Socket(Config.HOST, Config.PORT);

            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();
            int callCount = 0;

            long startTime = System.currentTimeMillis();

            while (true) {
                os.write(Config.request);
                read(is, Config.response);
                callCount++;
                if (callCount == Config.numOfCalls) {
                    System.out.println("numOfcalls=" + Config.numOfCalls + " time: " + (System.currentTimeMillis() - startTime));
                    callCount = 0;
                    startTime = System.currentTimeMillis();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void read(InputStream is, byte[] bytes) throws IOException {
        int num = 0;
        while(num < bytes.length) {
            num += is.read(bytes, num, bytes.length - num);
        }
    }

}

BlockingServer.java

public class BlockingServer {

    public static void main(String[] args) {
        try {
            ServerSocket srvSocket = new ServerSocket(Config.PORT);

            while (true) {
                final Socket socket = srvSocket.accept();

                new Thread() {
                    @Override
                    public void run() {
                        try {
                            InputStream is = socket.getInputStream();
                            OutputStream os = socket.getOutputStream();
                            while (true) {
                                BlockingClient.read(is, Config.request);
                                os.write(Config.response);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                socket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

NettyClient.java

    public final class NettyClient {

        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(
                                new NettyClientHandler());
                    }
                 });

                b.connect(Config.HOST, Config.PORT).sync().channel().closeFuture().sync();

            } finally {
                group.shutdownGracefully();
            }
        }
    }

NettyClientHandler.java

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private static ByteBuf responseBuf = Unpooled.wrappedBuffer(Config.response).clear();
    //private static ByteBuf responseBuf = Unpooled.directBuffer(Config.responseLength).clear();

    private int readLen = 0;
    private int callCount = 0;
    private long startTime;
    private long chunks = 0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // Send the first message
        initLog();
        writeRequest(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;

        int received = buf.readableBytes();
        responseBuf.writeBytes(buf);
        readLen += received;
        chunks++;

        if (readLen == Config.responseLength) {
            if (responseBuf.isWritable()) {
                System.out.println("Error. responseBuf.isWritable()==true");
            }
            readLen = 0;
            responseBuf.clear();

            if (callCount++ == Config.numOfCalls - 1) {
                doLog();
                initLog();
            }
            writeRequest(ctx);

        } else if (readLen > Config.responseLength) {
            System.out.println("Error. readLen is too big: " + readLen);
        }

        buf.release();

    }

    private void initLog() {
        callCount = 0;
        chunks = 0;
        startTime = System.currentTimeMillis();
    }

    private void doLog() {
        System.out.println(Config.numOfCalls + " performed in " + chunks + " chunks, time: "+ (System.currentTimeMillis() - startTime));
    }

    private void writeRequest(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.request));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

NettyServer.java

public final class NettyServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group, group)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(
                            new NettyServerHandler()
                    );
                }
             });

            b.bind(Config.PORT).sync().channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

NettyServerHandler.java

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private static ByteBuf requestBuf = Unpooled.wrappedBuffer(Config.request).clear();
    //private static ByteBuf requestBuf = Unpooled.directBuffer(Config.requestLength).clear();;

    private int readLen = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;

        int received = buf.readableBytes();
        requestBuf.writeBytes(buf);
        readLen += received;

        if (readLen == Config.requestLength) {
            if (requestBuf.isWritable()) {
                System.out.println("requestBuf.isWritable");
            }
            readLen = 0;
            requestBuf.clear();
            writeResponse(ctx);
        } else if (readLen > Config.responseLength) {
            System.out.println("readLen is too big: " + readLen);
        }

        buf.release();

    }

    private void writeResponse(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.response));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

Nio2Base.java

public abstract class Nio2Base {

    public static int numOfCalls = 50000;

    abstract ByteBuffer getWriteBuffer();
    abstract ByteBuffer getReadBuffer();
    abstract void messageReceived(ByteBuffer buffer);

    protected class ReadHandler implements CompletionHandler<Integer, Void> {
        private AsynchronousSocketChannel channel;
        private ByteBuffer buffer;

        ReadHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
            this.channel = channel;
            this.buffer = buffer;
        }

        @Override
        public void completed(Integer result, Void a) {
            if (buffer.hasRemaining()) {
                channel.read(buffer, null, this);
            } else {
                messageReceived(buffer);
                buffer.clear();
                ByteBuffer writeBuffer = getWriteBuffer();
                channel.write(writeBuffer, null, new WriteHandler(channel, writeBuffer));
            }

        }

        @Override
        public void failed(Throwable exc, Void a) {
            exc.printStackTrace();
        }

    }

    protected class WriteHandler implements CompletionHandler<Integer, Void> {
        private AsynchronousSocketChannel channel;
        private ByteBuffer buffer;

        WriteHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) {
            this.channel = channel;
            this.buffer = buffer;
        }

        @Override
        public void completed(Integer result, Void attachment) {
            if (buffer.hasRemaining()) {
                channel.write(buffer, null, this);
            } else {
                buffer.clear();
                ByteBuffer readBuffer = getReadBuffer();
                channel.read(readBuffer, null, new ReadHandler(channel, readBuffer));
            }
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            exc.printStackTrace();
        }
    }

}

Nio2Client.java

public class Nio2Client extends Nio2Base {

    private static ByteBuffer requestBuffer = ByteBuffer.wrap(Config.request);
    private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.response);

    private int count;
    private long startTime;
    private AsynchronousSocketChannel channel;

    public static void main(String[] args) throws Exception {
        new Nio2Client().init();

        // Wait
        System.in.read();
    }

    public void init() {
        // create an asynchronous socket channel bound to the default group
        try {
            channel = AsynchronousSocketChannel.open();
            if (channel.isOpen()) {
                // connect this channel's socket
                channel.connect(new InetSocketAddress(Config.HOST, Config.PORT), null, new ConnectHandler(channel));
            } else {
                System.out.println("The asynchronous socket channel cannot be opened!");
            }
        } catch (IOException ex) {
            System.err.println(ex);
        }
    }

    private class ConnectHandler implements CompletionHandler<Void, Void> {
        private AsynchronousSocketChannel channel;

        public ConnectHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void completed(Void result, Void attachment) {
            try {
                System.out.println("Successfully connected at: " + channel.getRemoteAddress());
                ByteBuffer buffer = getWriteBuffer();
                startTime = System.currentTimeMillis();
                count = 0;
                channel.write(buffer, null, new WriteHandler(channel, buffer));

            } catch (Exception e) {
                System.err.println(e);
            }
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            exc.printStackTrace();
            throw new UnsupportedOperationException("Connection cannot be established!");
        }

    }

    @Override
    ByteBuffer getWriteBuffer() {
        ByteBuffer ret = requestBuffer.duplicate();
        ret.position(ret.capacity());
        ret.flip();
        return ret;
    }

    @Override
    ByteBuffer getReadBuffer() {
        return (ByteBuffer)readBuffer.clear();
    }

    @Override
    void messageReceived(ByteBuffer buffer) {

        count++;

        if (count == numOfCalls) {

            System.out.println("Calls: " + count + " time: " + (System.currentTimeMillis() - startTime));

            count = 0;
            startTime = System.currentTimeMillis();
        }
    }

}

Nio2Server.java

public class Nio2Server extends Nio2Base {

    private static byte[] response = new byte[Config.responseLength];
    private static ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.request);


    public static void main(String[] args) {
        new Nio2Server().init();
    }

    public void init() {
        // create an asynchronous server socket channel bound to the default group
        try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()) {
            if (serverChannel.isOpen()) {

                // bind the server socket channel to local address
                serverChannel.bind(new InetSocketAddress(Config.HOST, Config.PORT));

                // display a waiting message while ... waiting clients
                System.out.println("Waiting for connections ...");

                AcceptHandler acceptHandler = new AcceptHandler(serverChannel);

                serverChannel.accept(null, acceptHandler);

                // Wait
                System.in.read();

            } else {
                System.out.println("The asynchronous server-socket channel cannot be opened!");
            }
        } catch (IOException ex) {
            System.err.println(ex);
        }
    }

    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
        private AsynchronousServerSocketChannel serverChannel;

        public AcceptHandler(AsynchronousServerSocketChannel serverChannel) {
            this.serverChannel = serverChannel;
        }

        @Override
        public void completed(AsynchronousSocketChannel channel, Void attachment) {

            serverChannel.accept(null, this);

            ByteBuffer buffer = getReadBuffer();

            try {
                System.out.println("Incoming connection from: " + channel.getRemoteAddress());

                channel.read(buffer, null, new ReadHandler(channel, buffer));

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            exc.printStackTrace();
            serverChannel.accept(null, this);
            throw new UnsupportedOperationException("Cannot accept connections!");
        }
    }

    @Override
    ByteBuffer getWriteBuffer() {
        return responseBuffer.duplicate();
    }

    @Override
    ByteBuffer getReadBuffer() {
        return (ByteBuffer)readBuffer.clear();
    }

    @Override
    void messageReceived(ByteBuffer buffer) {
    }


}

你的 BlockingServer 不合法。它不能正确地检测到流的结束。 - user207421
1个回答

1
我认为缓慢的启动是因为JIT需要时间进行预热,并且NIO比简单的阻塞IO更加复杂(需要更多的优化)。 当只有少量客户端时,我认为阻塞IO的性能会更高,因为Netty和NIO会因其复杂性而产生开销。然而,NIO比阻塞IO更具可扩展性(特别是使用Netty的epoll后端),可以轻松处理数千个客户端。
另外,最重要的是,过早进行优化是万恶之源。 如果你正在编写一个简单的命令行应用程序,Netty和NIO都是过度设计,你应该坚持使用阻塞IO。 然而,如果你打算编写一个健壮、可维护和高质量的网络应用程序,你应该使用Netty。 如果你决定从NIO切换到阻塞IO,你可以无缝地使用Netty进行切换,因为Netty还拥有一个blocking io backend,他们“建议用于少量连接”。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接