如何在代理 Netty 服务器中,在请求被写入 outboundChannel 的同一处理程序中获取响应的 byteBuf?

4
我正在实现 netty 代理服务器,如下所示: 当 HTTP 请求到来时,
  • 如果本地缓存有数据,将其写入通道并刷新
  • 如果没有,则从远程服务器获取数据,将其添加到缓存并刷新
在同一处理程序中,我在向客户端编写内容时,难以从响应中提取 ByteBuf。
在下面的示例中,如果您查看 HexDumpProxyFrontendHandler 的 channelRead 方法,您将看到我如何从缓存中获取并编写数据。在我遇到困难的位置下面,我已经添加了注释。
该代码按预期工作,因此可以复制并在本地测试。
我可以在 HexDumpProxyBackendhandler#channelRead 中看到 FullHttpResponse 对象。但是,在此方法内部,我没有对缓存或我想要添加到缓存中的 ID 的引用。
我认为有两种方法可以解决这个问题,但我不确定如何做到这一点。
1)在 HexdumpProxyBackendHandler 中获取缓存引用和 id,然后就变得容易了。但是,在 HexDumpFrontendHandler 的 channelActive 中实例化 hexDumpBackendhander,此时我还没有解析传入的请求。
2)获取从 HexdumpFrontendHandler#dchannelRead 提取的响应 ByteBuf,在这种情况下,只需插入缓存即可。 HexDumpProxy.java
public final class HexDumpProxy {

static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
         .childOption(ChannelOption.AUTO_READ, false)
         .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

}

HexDumpProxyInitializer.java

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;

public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache=cache;
}

@Override
public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(
            new LoggingHandler(LogLevel.INFO),
            new HttpServerCodec(),
            new HttpObjectAggregator(8*1024, true),
            new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}

}

HexDumpProxyFrontendHandler.java

 public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;

public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
     .channel(ctx.channel().getClass())
     .handler((new ChannelInitializer() {
         protected void initChannel(Channel ch) {
             ChannelPipeline var2 = ch.pipeline();
             var2.addLast((new HttpClientCodec()));
             var2.addLast(new HttpObjectAggregator(8192, true));
             var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
         }
     }))
     .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        System.out.println("msg is instanceof httpRequest");
        HttpRequest req = (HttpRequest)msg;
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        String userId = queryStringDecoder.parameters().get("id").get(0);
        Long id = Long.valueOf(userId);
        if (cache.containsKey(id)){
            StringBuilder buf = new StringBuilder();
            buf.append(cache.get(id));
            writeResponse(req, ctx, buf);
            closeOnFlush(ctx.channel());
            return;
        }
    }
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {
        closeOnFlush(outboundChannel);
    }

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    closeOnFlush(ctx.channel());
}

/**
 * Closes the specified channel after all queued write requests are flushed.
 */
static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
            Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
        // Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
        Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
        if (!cookies.isEmpty()) {
            // Reset the cookies if necessary.
            for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
                response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
            }
        }
    } else {
        // Browser sent no cookie.  Add some.
        response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
        response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
    }

    // Write the response.
    ctx.write(response);

    return keepAlive;
}

}

HexDumpProxyBackendHandler.java

public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
        System.out.println("this is fullHttpResponse");
    }
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}

}

P.S: 我使用了来自 netty-example 项目的大部分代码,并进行了定制。

编辑

根据 Ferrygig 的建议,我修改了 FrontEndChannelHander#channelRead 方法如下。我删除了 channelActive 并实现了 write 方法。

@Override public void channelRead(final ChannelHandlerContext ctx, Object msg) {

if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
        StringBuilder buf = new StringBuilder();
        buf.append(cache.get(id));
        writeResponse(req, ctx, buf);
        closeOnFlush(ctx.channel());
        return;
    }

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler((new ChannelInitializer() {
                protected void initChannel(Channel ch) {
                    ChannelPipeline var2 = ch.pipeline();
                    var2.addLast((new HttpClientCodec()));
                    var2.addLast(new HttpObjectAggregator(8192, true));
                    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
                }
            }));
            //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}
if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // was able to flush out data, start to read the next chunk
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

你觉得这个链接 https://dev59.com/qWAg5IYBdhLWcg3w1t1a#35318079 是否能够帮助你将一个出站响应与一个入站响应关联起来呢?(免责声明:这是我以前的回答之一) - Ferrybig
我看了一下,但在我的情况下无法使其工作。如果您能让它工作,请发布一下! - brain storm
@Ferrybig 有什么想法吗? - brain storm
2个回答

1

风暴

当我阅读你的HexDumpProxyFrontendHandler的这一部分时,我感觉可能有些不正确(我将我的评论放在正确的风格前面,以使它们可见):

 // Not incorrect but better to have only one bootstrap and reusing it
    Bootstrap b = new Bootstrap(); 
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler(new HexDumpProxyBackendHandler(inboundChannel))
 // I know what AUTO_READ false is, but my question is why you need it?
            .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
 // Strange to me to try to get the channel while you did not test yet it is linked
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
 // Maybe you should start to send there, therefore getting the outboundChannel right there?
 // add a log in order to see if you come there
 // probably you have to send first, before asking to read anything?
 // position (1)
                inboundChannel.read();
            } else {
                inboundChannel.close();
            }
        }
    });
 // I suggest to move this in position named (1)
    if (outboundChannel.isActive()) {
 // maybe a log to see if anything will be written?
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    System.out.println("success!! - FrontEndHandler");
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

对我来说,你似乎没有等待通道打开。当你发送到线路时,缺少一些日志,以确保你真的发送了一些东西(在日志中,我们只能看到连接已打开,然后主要是关闭,之间没有任何内容)。
也许更多的日志可以帮助我们和你吗?

这段代码基本上是从Netty示例存储库中获取的。我只需要进行一些小的更改。https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy - brain storm
我已经在这个问题上开了悬赏。我正在寻找一个可行的解决方案,因为它将对更广泛的受众有用。 - brain storm
请注意,这有点混乱,可能各个位置不是正确的地方,就像我建议的那样?它在channelRead覆盖方法中,就像你的一样。但是outboundChannel的打开是在channelActive中完成的,而不是在同一个方法中。从我的角度来看,似乎这两个步骤应该分开(在一个地方打开和开始读取,在另一个地方写入)。当然,我可能是错的。 - Frederic Brégier
感谢您关注此事。我已编辑完整的帖子,清楚地说明了我所面临的问题。之前有点模糊,对此我深表歉意。上面贴出的整个代码都是有效的。然而,如果您查看代码,就会看到我的真正问题所在。感谢您的帮助。 - brain storm

1

有多种方法来解决这个问题,而选择的方法取决于您的最终目标。

目前,您正在使用1个连接入站和1个连接出站的拓扑结构,这使系统设计稍微容易一些,因为您不必担心将多个请求同步到同一个出站流中。

目前,您的前端处理程序扩展了ChannelInboundHandlerAdapter,这仅拦截进入应用程序的“数据包”,如果我们将其扩展为ChannelDuplexHandler,我们还可以处理离开应用程序的“数据包”。

要走这条路,我们需要更新HexDumpProxyFrontendHandler类以扩展ChannelDuplexHandler(现在先称之为CDH)。

在这个过程中的下一步,是要覆盖来自CDH的write方法,以便我们可以拦截后端发送给我们的响应。
在我们创建了write方法之后,我们需要通过调用put方法来更新我们的(非线程安全)映射。
public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            System.out.println("msg is instanceof httpRequest");
            HttpRequest req = (HttpRequest)msg;
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
            String userId = queryStringDecoder.parameters().get("id").get(0);
            Long id = Long.valueOf(userId);
            lastId = id; // Store ID of last request
            // ...
        }
        // ...
    }
    // ...
    public void write(
        ChannelHandlerContext ctx,
        java.lang.Object msg,
        ChannelPromise promise
    ) throws java.lang.Exception {

        if (msg instanceof FullHttpResponse) {
            System.out.println("this is fullHttpResponse");
            FullHttpResponse full = (FullHttpResponse)msg;
            cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
        }
        super.write(ctx, msg, promise);
    }
    // ...
}

我们还没有完成,虽然代码已经放置好了,但是我们仍需要修复代码中其他地方的一些错误。 非线程安全的映射表(严重错误) 其中一个错误是你正在使用普通的哈希映射表来处理缓存。这样做的问题在于它不是线程安全的,如果多人同时连接到你的应用程序,可能会发生奇怪的事情,包括映射表内部结构的完全破坏。
为了解决这个问题,我们将“升级”映射表为ConcurrentHashMap,这个映射表具有特殊的结构来处理多个线程同时请求和存储数据,而不会在性能上出现巨大损失。(如果性能是主要关注点,您可能可以通过使用每个线程的哈希映射表而不是全局缓存来获得更高的性能,但这意味着每个资源都可以缓存到线程的数量。) 没有缓存删除规则(重大错误)

目前还没有代码来移除过时的资源,这意味着缓存会填满,直到程序没有剩余内存,然后它将崩溃。

可以通过使用提供线程安全访问和所谓的删除规则的映射实现,或使用已经预先制作的缓存解决方案(如Gnuava caches)来解决此问题。

未正确处理HTTP Pipelining(次要-主要错误)

HTTP的一个较少知道的功能是pipelining,这基本上意味着客户端可以向服务器发送另一个请求,而无需等待上一个请求的响应。此类错误包括服务器交换两个请求的内容,甚至完全搞乱它们。

虽然随着越来越多的HTTP2支持和对存在损坏服务器的认识,流水线请求现在很少见,但某些使用它的CLI工具仍然会发生。

为了解决这个问题,仅在发送前一个响应后才读取请求,其中一种方法是保持请求列表,或者选择更高级的预制解决方案

感谢您提供的解决方法和有关错误的提示。我已经意识到前两个问题,因为这只是起步阶段。管道技术是我今天学到的新知识!我在这里有一个问题,就是在frontendHandler#channelActive()中,我创建了一个s bootStrapserver。如果我的数据已经在缓存中,那么这是无用的。我该如何避免这种情况? - brain storm
1
根据你目前的架构,要适当解决起来并不容易,但你应该从代码中移除读取逻辑,并让它自动读取(从初始化器中删除“AUTO_READ, false”,以及所有对“channel.read”方法的调用,并在“channelActive”中注释掉连接代码,然后在“channelRead”内部,不要直接将请求管道化到后端,而是检查“channel”是否为空,如果为空,则建立新连接并发送数据,否则直接发送数据。 - Ferrybig
Netty有两种不同的接收数据系统,一种是自动完成,另一种是手动完成(autoread true/false)。我们希望将代码从autoread: false转换为autoread: true,由于现在读取是自动完成的,所以现在可以删除所有对读取的调用。由于您只想在出现未缓存的请求时连接到服务器一次,现在必须检查是否已经打开了一个连接,因为您不能向一个未打开的连接发送数据。 - Ferrybig
我已经编辑了上面的帖子,并对frontendHandler中的channelRead进行了更改。我所做的唯一其他更改是在HexDumpProxy中注释掉了auto_read.,false。现在服务器没有返回任何结果...它只是挂起,从未触发backEndHandler调试器。 - brain storm
让我们在聊天室中继续这个讨论 - Ferrybig
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接