Netty OIO客户端-服务器在java.net.SocketOutputStream.write(byte[])中仍然被阻塞

3

编辑:创建了Github仓库:https://github.com/istiillyes/client-server-netty

我使用netty 4.0.15.Final创建了一个客户端-服务器,并使用OIONIO进行了一些测试。

我发送了一些字符串,大小不同 [1KB, 10KB, 100KB]。

我需要服务器和客户端能够并行发送消息,因此测试看起来像这样:

  1. 启动服务器(创建用于接受连接的通道)
  2. 启动客户端(创建连接到服务器的通道)
  3. 在通道变为活动状态时,从客户端向服务器发送100条消息(反之亦然)。

使用NIO,消息被传输,一切正常。

但是,在使用OIO时,无论是服务器还是客户端都会在一段时间后被阻塞在java.net.SocketOutputStream.write(byte[])中,永远不会返回。

有任何想法为什么会发生这种情况吗?我在使用netty方面做错了什么吗?

我使用普通的Java套接字进行了相同的测试,而且它可以工作。因此,我猜测要么我没有正确使用netty,要么这是一个bug。

我在这里添加了创建通道和通道处理程序的代码。

这是从客户端捕获的YourKit堆栈跟踪:

pool-1-thread-1 [RUNNABLE, IN_NATIVE]
java.net.SocketOutputStream.write(byte[])
io.netty.buffer.UnpooledUnsafeDirectByteBuf.getBytes(int, OutputStream, int)
io.netty.buffer.AbstractByteBuf.readBytes(OutputStream, int)
io.netty.channel.oio.OioByteStreamChannel.doWriteBytes(ByteBuf)
io.netty.channel.oio.AbstractOioByteChannel.doWrite(ChannelOutboundBuffer)
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0()
io.netty.channel.AbstractChannel$AbstractUnsafe.flush()
io.netty.channel.DefaultChannelPipeline$HeadHandler.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.handler.logging.LoggingHandler.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.write(Object, boolean, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.writeAndFlush(Object, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.writeAndFlush(Object)
io.netty.channel.DefaultChannelPipeline.writeAndFlush(Object)
io.netty.channel.AbstractChannel.writeAndFlush(Object)
client.ClientHandler.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.handler.logging.LoggingHandler.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.DefaultChannelPipeline.fireChannelActive()
io.netty.channel.oio.AbstractOioChannel$DefaultOioUnsafe.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelPipeline$HeadHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelDuplexHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.handler.logging.LoggingHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelPipeline.connect(SocketAddress, ChannelPromise)
io.netty.channel.AbstractChannel.connect(SocketAddress, ChannelPromise)
io.netty.bootstrap.Bootstrap$2.run()
io.netty.channel.ThreadPerChannelEventLoop.run()
io.netty.util.concurrent.SingleThreadEventExecutor$2.run()
java.lang.Thread.run()

创建接收器通道的代码:

final class ServerChannelFactory {

    private static final Logger LOGGER = Logger.getLogger(ServerChannelFactory.class);

    protected static Channel createAcceptorChannel(
            final ChannelType channelType,
            final InetSocketAddress localAddress,
            final ServerHandler serverHandler
    ) {
        final ServerBootstrap serverBootstrap = ServerBootstrapFactory.createServerBootstrap(channelType);

        serverBootstrap
                .childHandler(new ServerChannelInitializer(serverHandler))
                .option(ChannelOption.SO_BACKLOG, Resources.SO_BACKLOG);

        try {
            ChannelFuture channelFuture = serverBootstrap.bind(
                    new InetSocketAddress(localAddress.getPort())).sync();
            channelFuture.awaitUninterruptibly();
            if (channelFuture.isSuccess()) {
                return channelFuture.channel();

            } else {
                LOGGER.warn(String.format("Failed to open socket! Cannot bind to port: %d!",
                        localAddress.getPort()));
            }
        } catch (InterruptedException e) {
            LOGGER.error("Failed to create acceptor socket.", e);
        }
        return null;
    }

    private static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

        private ChannelHandler serverHandler;

        private ServerChannelInitializer(ChannelHandler serverHandler) {
            this.serverHandler = serverHandler;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // Encoders
            ch.pipeline().addLast(Resources.STRING_ENCODER_NAME, new StringEncoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_ENCODER_NAME, Resources.FRAME_ENCODER_NAME,
                    new LengthFieldPrepender(Resources.FRAME_LENGTH_FIELD_SIZE));

            // Decoders
            ch.pipeline().addLast(Resources.STRING_DECODER_NAME, new StringDecoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_DECODER_NAME, Resources.FRAME_DECODER_NAME,
                    new LengthFieldBasedFrameDecoder(Resources.MAX_FRAME_LENGTH,
                            Resources.FRAME_LENGTH_FIELD_OFFSET, Resources.FRAME_LENGTH_FIELD_SIZE,
                            Resources.FRAME_LENGTH_ADJUSTMENT, Resources.FRAME_LENGTH_FIELD_SIZE));

            // Handlers
            ch.pipeline().addLast(Resources.LOGGING_HANDLER_NAME, new LoggingHandler());
            ch.pipeline().addLast(Resources.SERVER_HANDLER_NAME, serverHandler);
        }
    }
}

服务器处理程序:

final class ServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = Logger.getLogger(ServerHandler.class);
    int noMessagesReceived = 0;

    @Override
    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
        for(int i=0; i< Resources.NO_MESSAGES_TO_SEND; i++) {
            ctx.channel().writeAndFlush(MessageStore.getMessage(i));
        }
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        noMessagesReceived++;
        if(noMessagesReceived == Resources.NO_MESSAGES_TO_SEND) {
            ctx.channel().writeAndFlush(MessageStore.getMessage(0));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        FileUtils.write(Resources.SERVER_OUTPUT, String.format("Received %d messages", noMessagesReceived));
    }

    @Override
    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
        LOGGER.error(String.format("Exception in %s", this.getClass().getName()), cause);
    }
}

服务器引导工厂:

public class ServerBootstrapFactory {

    private ServerBootstrapFactory() {
    }

    public static ServerBootstrap createServerBootstrap(final ChannelType channelType) throws UnsupportedOperationException {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        switch (channelType) {
            case NIO:
                serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup());
                serverBootstrap.channel(NioServerSocketChannel.class);
                return serverBootstrap;

            case OIO:
                serverBootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup());
                serverBootstrap.channel(OioServerSocketChannel.class);
                return serverBootstrap;

            default:
                throw new UnsupportedOperationException("Failed to create ServerBootstrap,  " + channelType + " not supported!");
        }
    }
}

创建连接器通道的代码:

final class ClientChannelFactory {

    private static final Logger LOGGER = Logger.getLogger(ClientChannelFactory.class);

    protected static Channel createConnectorChannel(
            ChannelType channelType,
            final ClientHandler clientHandler,
            InetSocketAddress remoteAddress
    ) {
        final Bootstrap bootstrap = BootstrapFactory.createBootstrap(channelType);

        bootstrap.handler(new ClientChannelInitializer(clientHandler));

        try {
            final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(remoteAddress.getAddress(), remoteAddress.getPort()))
                    .sync();
            channelFuture.awaitUninterruptibly();
            if (channelFuture.isSuccess()) {
                return channelFuture.channel();

            } else {
                LOGGER.warn(String.format(
                        "Failed to open socket! Cannot connect to ip: %s port: %d!",
                        remoteAddress.getAddress(), remoteAddress.getPort())
                );
            }
        } catch (InterruptedException e) {
            LOGGER.error("Failed to open socket!", e);
        }
        return null;
    }

    private static class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

        private ChannelHandler clientHandler;

        private ClientChannelInitializer(ChannelHandler clientHandler) {
            this.clientHandler = clientHandler;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // Encoders
            ch.pipeline().addLast(Resources.STRING_ENCODER_NAME, new StringEncoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_ENCODER_NAME, Resources.FRAME_ENCODER_NAME,
                    new LengthFieldPrepender(Resources.FRAME_LENGTH_FIELD_SIZE));

            // Decoders
            ch.pipeline().addLast(Resources.STRING_DECODER_NAME, new StringDecoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_DECODER_NAME, Resources.FRAME_DECODER_NAME,
                    new LengthFieldBasedFrameDecoder(Resources.MAX_FRAME_LENGTH,
                            Resources.FRAME_LENGTH_FIELD_OFFSET, Resources.FRAME_LENGTH_FIELD_SIZE,
                            Resources.FRAME_LENGTH_ADJUSTMENT, Resources.FRAME_LENGTH_FIELD_SIZE));

            // Handlers
            ch.pipeline().addLast(Resources.LOGGING_HANDLER_NAME, new LoggingHandler());
            ch.pipeline().addLast(Resources.CLIENT_HANDLER_NAME, clientHandler);
        }
    }
}

客户端处理程序:

public final class ClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = Logger.getLogger(ClientHandler.class);
    private int noMessagesReceived = 0;

    @Override
    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
        for(int i=0; i< Resources.NO_MESSAGES_TO_SEND; i++) {
            ctx.channel().writeAndFlush(MessageStore.getMessage(i));
        }
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        noMessagesReceived++;
        if (noMessagesReceived > Resources.NO_MESSAGES_TO_SEND) {
            ctx.channel().close();
        }
    }

    @Override
    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
        FileUtils.write(Resources.CLIENT_OUTPUT, String.format("Received %d messages", noMessagesReceived));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error(String.format("Exception in %s", this.getClass().getName()), cause);
    }
}

Bootstrap 工厂:

public class BootstrapFactory {

    private BootstrapFactory() {
    }

    public static Bootstrap createBootstrap(final ChannelType channelType) throws UnsupportedOperationException {
        Bootstrap bootstrap = new Bootstrap();

        switch (channelType) {
            case NIO:
                bootstrap.group(new NioEventLoopGroup());
                bootstrap.channel(NioSocketChannel.class);
                return bootstrap;

            case OIO:
                bootstrap.group(new OioEventLoopGroup());
                bootstrap.channel(OioSocketChannel.class);
                return bootstrap;

            default:
                throw new UnsupportedOperationException("Failed to create Bootstrap,  " + channelType + " not supported!");
        }
    }
}

频道类型:

public enum ChannelType {

    // New IO - non-blocking
    NIO,

    // Old IO - blocking
    OIO;
}

我想尝试这段代码。如何运行客户端? - Kishore
你可以在这里找到一份关于Netty的非常好的教程:http://seeallhearall.blogspot.ro/2012/05/netty-tutorial-part-1-introduction-to.html - Illyes Istvan
谢谢。我一定会仔细阅读教程。我有一个简单的问题,就是如何创建OIO UDT通道?我找不到适合它的通道。我已经有一个工作中的NIO UDT通道,但现在我想要OIO UDT。 - Kishore
你是不是想说UDP?你可以使用OioDatagramChannel。 - Illyes Istvan
没有,有一个叫做UDT的协议。它被Netty支持。UDT是建立在UDP之上的。 - Kishore
我认为目前还没有支持OIO UDT通道的功能。如果您查看UDTChannel的实现,您会发现它们都是针对NIO的。 - Illyes Istvan
2个回答

4
由于Netty的OIO传输在同一个线程中执行读取和写入操作,因此它在写入过程中不会进行读取。
问题在于,如果客户端和服务器都使用OIO传输实现,它们可能会互相写入并等待对方读取它们正在写入的内容。
解决方法是:1)至少在一侧使用NIO,或者2)非常小心地不要填满对等方的套接字接收缓冲区到其最大大小。实际上,(2)很难实现,所以至少建议在服务器端使用NIO传输。

感谢您的回答,Trustin。 - Illyes Istvan
1
在Netty 3.x中,如果您总是从单独的线程(而不是I/O工作线程)编写,则写入将由调用线程处理,因此读取永远不会被写入阻塞(读取在I/O工作线程上处理)。对吗? - Illyes Istvan
问题出在配置管道的代码上。在最后一步中,您需要使用一个EventLoopGroup实例将clienthandler/serverhandler与当前读线程分离开来,以便在不同的线程上处理东西。 - Claudiu

1

write() 在发送者远超接收者时会阻塞。将阻塞式和非阻塞式 I/O 结合起来使用不是一个好主意。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接