使用Netty HTTP客户端重试请求

4

如何在基于Netty的HTTP客户端中重试HTTP请求?

考虑以下处理程序,如果收到HTTP响应代码503,则在1秒后尝试重试HTTP请求:

public class RetryChannelHandler extends ChannelDuplexHandler {
    List<HttpObject> requestParts;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof HttpRequest) {
            requestParts = new ArrayList<>();
            requestParts.add((HttpRequest)msg);
        } else if (msg instanceof HttpObject) {
            requestParts.add((HttpObject)msg);
        }

        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) {
            HttpResponse res = (HttpResponse)msg;
            if (res.status().code() == 503) {
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        for (HttpObject obj : requestParts) {
                            ctx.channel().write(obj);
                        }
                    }
                }, 1000, TimeUnit.MILLISECONDS);
            } else {
                super.channelRead(ctx, msg);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }
}

当我在这个例子中向通道写入内容时,管道中的其他处理程序会看到HttpObjects,但实际上并不会再次执行HttpRequest——只收到一个HttpResponse。
我认为我在这种情况下简单地误用了Channel,并且我需要创建一个新的Channel(代表与服务器的新连接)来执行重试。对我来说不清楚的是如何从处理程序的上下文中创建新的Channel,以及我是否真的在正确的Netty层中进行这种逻辑。
如果有关于如何实现我所描述的行为的任何指导将不胜感激。
1个回答

3
在调用write(...)之后,您还需要调用flush(),否则它将不会刷新通道。此外,您需要确保正确使用retain()duplicate()来处理HttpContent对象,否则您可能会尝试写入已释放的HttpContent对象。

类似于以下内容(未经测试):

public class RetryChannelHandler extends ChannelDuplexHandler {
    Queue<HttpObject> requestParts;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof HttpRequest) {
            requestParts = new ArrayDeque<>();
            requestParts.add((HttpRequest)msg);
        } else if (msg instanceof HttpContent) {
            requestParts.add(((HttpContent)msg).duplicate().retain());
        }

        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) {
            HttpResponse res = (HttpResponse)msg;
            if (res.status().code() == 503) {
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        HttpObject obj;
                        while ((obj = requestParts.poll()) != null) {
                            ctx.write(obj);
                        }
                        ctx.flush();
                    }
                }, 1000, TimeUnit.MILLISECONDS);
            } else {
                HttpObject obj;
                while ((obj = requestParts.poll()) != null) {
                    ReferenceCountUtil.release(obj);
                }
                super.channelRead(ctx, msg);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }
}

谢谢!在稍微调整实现后,这解决了我的问题。 - Rikki Gibson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接