使用Netty实现的异步HTTP客户端

10

我是netty的新手,仍在努力寻找方向。我想创建一个可以异步工作的http客户端。 netty的http示例仅展示如何等待IO操作,而没有展示如何使用addListener,因此我一直在尝试弄清楚这个问题。

我正在尝试创建一个请求类,它将处理请求的所有不同状态,从连接、发送数据、处理响应到关闭连接。 为了做到这一点,我的类扩展了SimpleChannelUpstreamHandler并实现了ChannelFutureListener。我使用了一个ChannelPipelineFactory,它将该类(作为SimpleChannelUpstreamHandler)作为处理程序添加到了管道中。

连接是通过以下方式创建的:

this.state = State.Connecting;
this.clientBootstrap.connect(this.address).addListener(this);

接下来是 operationComplete 方法:

@Override
public void operationComplete(ChannelFuture future) throws Exception {
    State oldState = this.state;

    if (!future.isSuccess()) {
        this.status = Status.Failed;
        future.getChannel().disconnect().addListener(this);
    }
    else if (future.isCancelled()) {
        this.status = Status.Canceled;
        future.getChannel().disconnect().addListener(this);
    }
    else switch (this.state) {
        case Connecting:
            this.state = State.Sending;
            Channel channel = future.getChannel();
            channel.write(this.createRequest()).addListener(this);
            break;

        case Sending:
            this.state = State.Disconnecting;
            future.getChannel().disconnect().addListener(this);
            break;

        case Disconnecting:
            this.state = State.Closing;
            future.getChannel().close().addListener(this);
            break;

        case Closing:
            this.state = State.Finished;
            break;
    }
    System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status);
}

private HttpRequest createRequest() {
    String url = this.url.toString();

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
    request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);

    return request;
}

该类还重写了messageReceived方法:

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    System.out.println("messageReceived");
    HttpResponse response = (HttpResponse) e.getMessage();

    ChannelBuffer content = response.getContent();
    if (content.readable()) {
        System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
    }
}
问题在于我得到了这个输出:
request operationComplete start state: Connecting, end state: Sending, status: Unknown
request operationComplete start state: Sending, end state: Disconnecting, status: Unknown
request operationComplete start state: Closing, end state: Finished, status: Unknown
request operationComplete start state: Disconnecting, end state: Finished, status: Unknown

如您所见,messageReceived方法没有被执行,尽管Pipeline Factory将该类的实例添加到了Pipeline中。

您有什么想法或建议吗?感谢您。


编辑

在@JestanNirojan的帮助下,我终于解决了这个问题。如果有人对解决方案感兴趣,请参考:

public class ClientRequest extends SimpleChannelUpstreamHandler {

    ....

    public void connect() {
        this.state = State.Connecting;
        System.out.println(this.state);
        this.clientBootstrap.connect(this.address);
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Sending;
        System.out.println(this.state);
        ctx.getChannel().write(this.createRequest());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HttpResponse response = (HttpResponse) e.getMessage();

        ChannelBuffer content = response.getContent();
        if (content.readable()) {
            System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
        }

        this.state = State.Disconnecting;
        System.out.println(this.state);
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Closing;
        System.out.println(this.state);
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Finished;
        System.out.println(this.state);
    }

    private HttpRequest createRequest() {
        String url = this.url.toString();

        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
        request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
        request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
        request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);

        return request;
    }
}

HttpResponse是完整的HttpResponse还是可以是一个块?我有成千上万的块返回,想要每个块一个事件,否则内存会爆炸导致内存不足。 - Dean Hiller
HttpResponse 是完整的响应,据我所知,您无法将其分块。您应该更低级别地处理,可能使用 HttpResponseDecoder。 - Nitzan Tomer
如果您不感兴趣分块,请使用此处的轻量级HTTP客户端@https://github.com/arungeorge81/netty-http-client。 - Arun George
1个回答

3
您正在使用ChannelFutureListener在通道中执行所有操作(这是不好的),并且future listener将在调用这些通道操作后立即执行。
问题在于,发送消息后,通道立即断开连接,处理程序无法接收稍后到达的响应消息。
        ........
    case Sending:
        this.state = State.Disconnecting;
        future.getChannel().disconnect().addListener(this);
        break;
        ........

你不应该完全阻塞通道的未来线程。最好的方法是扩展SimpleChannelUpstreamHandler的功能。
    channelConnected(..) {} 
    messageReceived(..) {} 
    channelDisconnected(..) {} 

可以使用方法来处理和响应这些事件。您也可以在该处理程序中保留状态。


1
哦,那很简单。非常感谢您提供的信息,但我希望Netty在这方面有更好的文档资料。 - Nitzan Tomer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接