Netty线程模型与长时间运行的任务

4

更新: 好的,因为这是我在这里发布的第一篇文章,可能是因为我看到了零反馈,我可能太啰嗦了。所以,我决定添加一个图表来更好地可视化我的问题。也许有人会看看(我在看着你,Norman Maurer ;o) )

netty threading problem

请问有谁能确认我的发现并解释一下我如何防止长时间运行的任务阻塞其他短请求?更一般地说,在处理Netty服务器上的长时间运行的作业时,建议采用什么可扩展且不锁定的解决方案?


过去两周我一直在学习Netty 4/5,并阅读了很多关于其线程模型的资料(包括《Netty实战》)。然后我用一个简单的测试应用程序检查了自己的理解,想查看Netty服务器对多个TCP简单并发客户端的响应情况,其中一些客户端请求服务器上的长时间运行作业。我的结果与预期不符。

我的设置:

  • netty 4.1.45
  • 在Windows 10上使用jdk 1.8
  • 向服务器打开15个并发连接
  • 1个客户端请求长时间运行的作业(在服务器端模拟为睡眠)

我的服务器初始化如下:

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();

try {
    bootstrap
        .group(bossGroup, workGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new MyChannelInitializer(new DefaultEventExecutorGroup(5)));
            
    ChannelFuture future = bootstrap.bind(new InetSocketAddress(SERVER_PORT));
    future.addListener(new MyChannelFutureListener());

    future.channel().closeFuture().sync();
            
} catch (InterruptedException e) {
    logger.error("Error: ", e);
} finally {
    bossGroup.shutdownGracefully().sync();
}

在通道的初始化程序中,我有以下代码:
public class MyChannelInitializer extends ChannelInitializer<Channel> {

    private EventExecutorGroup handlerGroup = null;

    public MyChannelInitializer() {
    }

    public MyChannelInitializer(EventExecutorGroup handlerGroup) {
        this.handlerGroup = handlerGroup;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
        ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());
    }
}

如您所见,我希望我的入站处理程序在单独的eventExecutionGroup中执行,以免阻塞NIO事件循环。 我的入站处理程序逻辑非常简单:
public class MyTestInboudHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = LogManager.getLogger((MyTestInboudHandler.class));

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        if (msg.isReadable()) {
            String content = msg.toString(CharsetUtil.UTF_8);
            int len = content.length();
            if (len > 120) {
                content = content.substring(0, 119);
            }

            String clientId = content.substring(0, content.indexOf(':'));
            String payload = content.substring(content.indexOf(':') + 1);

            logger.info(ctx.channel().hashCode() + ": received message of length: " + len + " -- " + content + "<<<");

            if (payload.equals("LONG_TASK_REQ")) {
                // simulate load run
                Thread.sleep(50_000L);
                ctx.writeAndFlush(Unpooled.copiedBuffer("DoneBIG", CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE);
            } else {
                ctx.writeAndFlush(Unpooled.copiedBuffer("Done", CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE);
            }

        } else {
            logger.info("ByteBuf not readable...");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        logger.error("{} - Exception in input handler: {} ", ctx.channel().hashCode(), cause.getMessage());
        ctx.close();

    }
}

如果在请求有效负载中找到的客户端ID等于13,则意味着需要执行长时间的任务(例如向数据库进行JDBC调用),我使用50秒的长时间休眠来模拟此过程。所有其他请求都通过简单回复“完成”并关闭连接来立即处理。
我的预期结果是看到所有客户端都立即完成对服务器的调用,除了客户端13需要50秒才能完成。以下是客户端的实现,也使用Netty:
public class MyNettyClient extends Thread {

    private static final Logger logger = LogManager.getLogger(MyNettyClient.class);

    private int id;

    private MyNettyClient() {
    }

    public MyNettyClient(int id) {
        this();
        this.id = id;
    }

    public void run() {

        logger.info("Netty client starting with id: " + this.id);

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(group)
                .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8778))
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyNettyClientHandler(id));
                        }
                    });

            ChannelFuture future = bootstrap.connect().sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("Error from client " + id + " : ", e);
        } finally {
            logger.info("Exiting client: " + id);
            try {
                group.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                logger.error("Error from client " + id, e);
            }
        }

    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<?>> futureList = new ArrayList<Future<?>>();

        for (int i = 1; i < 15; ++i) {
            futureList.add(executor.submit(new MyNettyClient(i)));
        }

        for (Future<?> f : futureList) {
            f.get();
            if (f.isCancelled()) {
                logger.info("One future was cancelled: " + f.toString());
            }
        }

        logger.info("Calling shutdown on executor");
        executor.shutdown();

        logger.info("MAIN THREAD DONE");
    }
}

以下是将调用写入Netty服务器的处理程序的逻辑:
public class MyNettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = LogManager.getLogger(MyNettyClientHandler.class);

    private int clientId;

    public MyNettyClientHandler(int id) {
        clientId = id;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        logger.info("@@@@@Message received: " + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf b = (ByteBuf) msg;
        String serverMsg = b.toString(CharsetUtil.UTF_8);
        logger.info("Read from server by client " + clientId + ": " + serverMsg);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        String content = null;

        if (clientId % 13 == 0) {
            content = clientId + ":LONG_TASK_REQ";
        } else {
            content = clientId + ":SMALL_TASK_REQ";
        }
        ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)).sync();

        if (f.isDone()) {
             logger.info("Send for client {} completed successfully.", clientId);
        } else {
            logger.error("Future not done for client {}. Reason: {}", clientId, f.cause().getMessage());
        }

        logger.info("Finish sending from client " + clientId);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        logger.error("Exception in client {}, type: {} ", clientId, cause.getMessage());

        ctx.close();
    }
}

我得到的结果表明,当服务器接收并在50秒内处理长请求时,它总是会阻止不同客户端在不同通道上进行的其他请求。似乎通道在激活时绑定到EventExecutionGroup的某个线程,并且之后在该通道上进行的所有活动都将由相同的线程处理,无论该线程有多忙,池中有多少空闲线程。这是我真正不理解的事情,因为EventExecutionGroup的整个目的是处理长时间的同步作业,而不对其他客户端产生任何干扰。
我知道我可以实现自己的工作线程池来处理和同步长作业,但问题仍然存在:为什么我们可以编写ch.pipeline().addLast(handlerGroup,new MyTestInboudHandler());,如果结果如此受限。
请给予建议。
1个回答

0
一个 EventLoopGroup 只是一组 EventLoop 的集合。 每个 EventLoop 是一个独立的线程,负责一个或多个 Channel,并在其生命周期内属于该通道。 这种关系是一对多的,一个单独的 EventLoop 可以处理多个 Channel
当通道注册到 EventLoopGroup 后,它会将它们分配给一个新的 EventLoop(如果容量允许),或者一个现有的 EventLoop(通过循环轮询)。 这意味着一个长时间运行的任务可能会在一个通道的 EventLoop 上运行,而该 EventLoop 同时也在处理来自其他客户端的短时间运行任务的 Channels,这就导致了你所经历的阻塞。
你可以做的一件事是将这些长时间运行的任务从你的 MyTestInboundHandler 中转移到一个单独的 ExecutorService 中。这样,长时间运行的任务就不会阻塞,而只是将处理分派到另一个线程。
请注意,当您将任务转移到单独的ExecutorService时,它将在单独的线程中运行,并且要写入通道,写操作Channel.writeAndFlush(data)实际上会将任务加入到ChannelEventLoop队列中,并在下一次迭代中运行它,而不是立即运行它。
例如:
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    myExecutor.submit(() -> {
        byte[] response = doSomeLongTask(msg);
        ctx.channel().writeAndFlush(response); // Submits to channel's EventLoop queue to be written
    }
}

现在来回答主要问题: 这与为每个处理程序添加EventExecutor有什么区别?

您几乎可以通过执行相同的操作来实现相同的结果。

ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());

主要区别在于,如果您将自己的handlerGroup添加到每个处理程序中,则每个套接字事件都将在该组中运行(强调我的):[1]

用户在向ChannelPipeline添加处理程序时可以指定EventExecutor。

  • 如果指定了,则始终使用指定的EventExecutor调用ChannelHandler的处理程序方法。
  • 如果未指定,则处理程序方法始终由其关联的Channel注册的EventLoop调用。
EventExecutor和EventLoop分配给处理器或通道时,始终是单线程的。
处理器方法将始终由同一线程调用。
如果指定了多线程的EventExecutor或EventLoop,则首先选择其中一个线程,然后选择的线程将在取消注册之前使用。
如果相同管道中的两个处理器被分配给不同的EventExecutors,则它们将同时被调用。即使共享数据仅由同一管道中的处理器访问,用户也必须注意线程安全性,如果多个处理器访问共享数据。
这意味着对于发生在管道上的每个处理器事件以及分配了自己的执行器的处理器来说,这些完整的方法(如channelRead0())将在指定执行器的自己的线程中运行。
如果您不想为处理器指定执行器,也可以通过重写每个处理器方法并立即将调用转移到自定义执行器来实现。注意:您需要重写每个方法,未重写的方法仍将在netty IO EventLoop线程中运行。

因此,它们实际上几乎相同。 通过使用分配给处理程序的执行器,您仍然受到单个线程的限制,因为Netty将重用相同的线程来处理每个处理程序。但好处是,每个处理程序方法将自动在与通道分配的 EventLoop 不同的单独线程 中运行。

通过使用自己的执行器并将任务卸载给它,EventLoop仍然是在同一线程中调用每个处理程序方法,但您可以将实际工作卸载到自己的执行器中。这样,执行器也可以利用超过 1 个线程来同时处理每个这些任务。


感谢@pierater的回答,基本上证实了我的发现和理解。不幸的是,它仍然没有回答我在最后一段中的问题。也就是说,编写此代码的好处是什么:ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());?文档和书籍将其提及为在netty中高效处理长时间运行的作业的解决方案。但正如我们所提到的,这实际上并不是正确的方法。您必须使用“外部”ExecutorService对象显式处理此问题。 - Gilmour
@Gilmour 我已经更新了我的答案。它是否解决了你的问题? - pierater
1
嗨@pierater!现在你的最新更新让我不同意你。:-) 特别是这部分:“你几乎可以达到相同的结果”。我的实际测试显示,在不使用专用EventExecutor时,我在不同客户端请求之间出现了争用!在大多数情况下,这是不可接受的。因此,我不建议将此方法用于任何事情!对我来说,只有使用EventExecutor选项才值得实施。另一个选项是一个陷阱。因为如果处理单个请求时出现任何问题,可能会影响多个不相关的客户端。 - Gilmour

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接