更新: 好的,因为这是我在这里发布的第一篇文章,可能是因为我看到了零反馈,我可能太啰嗦了。所以,我决定添加一个图表来更好地可视化我的问题。也许有人会看看(我在看着你,Norman Maurer ;o) )
请问有谁能确认我的发现并解释一下我如何防止长时间运行的任务阻塞其他短请求?更一般地说,在处理Netty服务器上的长时间运行的作业时,建议采用什么可扩展且不锁定的解决方案?
过去两周我一直在学习Netty 4/5,并阅读了很多关于其线程模型的资料(包括《Netty实战》)。然后我用一个简单的测试应用程序检查了自己的理解,想查看Netty服务器对多个TCP简单并发客户端的响应情况,其中一些客户端请求服务器上的长时间运行作业。我的结果与预期不符。
我的设置:
- netty 4.1.45
- 在Windows 10上使用jdk 1.8
- 向服务器打开15个并发连接
- 1个客户端请求长时间运行的作业(在服务器端模拟为睡眠)
我的服务器初始化如下:
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyChannelInitializer(new DefaultEventExecutorGroup(5)));
ChannelFuture future = bootstrap.bind(new InetSocketAddress(SERVER_PORT));
future.addListener(new MyChannelFutureListener());
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("Error: ", e);
} finally {
bossGroup.shutdownGracefully().sync();
}
在通道的初始化程序中,我有以下代码:
public class MyChannelInitializer extends ChannelInitializer<Channel> {
private EventExecutorGroup handlerGroup = null;
public MyChannelInitializer() {
}
public MyChannelInitializer(EventExecutorGroup handlerGroup) {
this.handlerGroup = handlerGroup;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());
}
}
如您所见,我希望我的入站处理程序在单独的eventExecutionGroup中执行,以免阻塞NIO事件循环。 我的入站处理程序逻辑非常简单:
public class MyTestInboudHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LogManager.getLogger((MyTestInboudHandler.class));
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
if (msg.isReadable()) {
String content = msg.toString(CharsetUtil.UTF_8);
int len = content.length();
if (len > 120) {
content = content.substring(0, 119);
}
String clientId = content.substring(0, content.indexOf(':'));
String payload = content.substring(content.indexOf(':') + 1);
logger.info(ctx.channel().hashCode() + ": received message of length: " + len + " -- " + content + "<<<");
if (payload.equals("LONG_TASK_REQ")) {
// simulate load run
Thread.sleep(50_000L);
ctx.writeAndFlush(Unpooled.copiedBuffer("DoneBIG", CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE);
} else {
ctx.writeAndFlush(Unpooled.copiedBuffer("Done", CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE);
}
} else {
logger.info("ByteBuf not readable...");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("{} - Exception in input handler: {} ", ctx.channel().hashCode(), cause.getMessage());
ctx.close();
}
}
如果在请求有效负载中找到的客户端ID等于13,则意味着需要执行长时间的任务(例如向数据库进行JDBC调用),我使用50秒的长时间休眠来模拟此过程。所有其他请求都通过简单回复“完成”并关闭连接来立即处理。
我的预期结果是看到所有客户端都立即完成对服务器的调用,除了客户端13需要50秒才能完成。以下是客户端的实现,也使用Netty:
public class MyNettyClient extends Thread {
private static final Logger logger = LogManager.getLogger(MyNettyClient.class);
private int id;
private MyNettyClient() {
}
public MyNettyClient(int id) {
this();
this.id = id;
}
public void run() {
logger.info("Netty client starting with id: " + this.id);
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("127.0.0.1", 8778))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyNettyClientHandler(id));
}
});
ChannelFuture future = bootstrap.connect().sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("Error from client " + id + " : ", e);
} finally {
logger.info("Exiting client: " + id);
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
logger.error("Error from client " + id, e);
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> futureList = new ArrayList<Future<?>>();
for (int i = 1; i < 15; ++i) {
futureList.add(executor.submit(new MyNettyClient(i)));
}
for (Future<?> f : futureList) {
f.get();
if (f.isCancelled()) {
logger.info("One future was cancelled: " + f.toString());
}
}
logger.info("Calling shutdown on executor");
executor.shutdown();
logger.info("MAIN THREAD DONE");
}
}
以下是将调用写入Netty服务器的处理程序的逻辑:
public class MyNettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LogManager.getLogger(MyNettyClientHandler.class);
private int clientId;
public MyNettyClientHandler(int id) {
clientId = id;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
logger.info("@@@@@Message received: " + msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf b = (ByteBuf) msg;
String serverMsg = b.toString(CharsetUtil.UTF_8);
logger.info("Read from server by client " + clientId + ": " + serverMsg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String content = null;
if (clientId % 13 == 0) {
content = clientId + ":LONG_TASK_REQ";
} else {
content = clientId + ":SMALL_TASK_REQ";
}
ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)).sync();
if (f.isDone()) {
logger.info("Send for client {} completed successfully.", clientId);
} else {
logger.error("Future not done for client {}. Reason: {}", clientId, f.cause().getMessage());
}
logger.info("Finish sending from client " + clientId);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Exception in client {}, type: {} ", clientId, cause.getMessage());
ctx.close();
}
}
我得到的结果表明,当服务器接收并在50秒内处理长请求时,它总是会阻止不同客户端在不同通道上进行的其他请求。似乎通道在激活时绑定到EventExecutionGroup的某个线程,并且之后在该通道上进行的所有活动都将由相同的线程处理,无论该线程有多忙,池中有多少空闲线程。这是我真正不理解的事情,因为EventExecutionGroup的整个目的是处理长时间的同步作业,而不对其他客户端产生任何干扰。
我知道我可以实现自己的工作线程池来处理和同步长作业,但问题仍然存在:为什么我们可以编写
ch.pipeline().addLast(handlerGroup,new MyTestInboudHandler());
,如果结果如此受限。请给予建议。
ch.pipeline().addLast(handlerGroup, new MyTestInboudHandler());
?文档和书籍将其提及为在netty中高效处理长时间运行的作业的解决方案。但正如我们所提到的,这实际上并不是正确的方法。您必须使用“外部”ExecutorService对象显式处理此问题。 - Gilmour