如何实现一个异步的HTTP客户端输入流,而不是字节数组输入流?

3
我正在使用异步Http客户端从互联网下载大量(可能很大)的文件。在我的情况下,我需要将这些下载URL的字节InputStream发送到另一个服务进行解析。一个天真的做法是这样的:
AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
    .setMaxConnectionsPerHost(-1)
    .setMaxConnections(-1)
    .setPooledConnectionIdleTimeout(60 * 10 * 1000)
    .setConnectionTtl(6 * 60 * 1000)
    .setConnectTimeout(5 * 1000)
    .setRequestTimeout(5 * 60 * 1000)
    .setFollowRedirect(true)
    .setRealm(new Realm.Builder(username, password)
        .setNtlmDomain(domain)
        .setScheme(Realm.AuthScheme.NTLM)
        .build())
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute().get();
return httpGetResponse.getResponseBodyAsStream();

但是在异步Http请求的教程中,我们了解到与HTTP Components Http客户端不同的是,异步Http客户端将整个文件下载到内存中。这在我的情况下很快会导致OOM(内存溢出)问题。因此,另一种选择是:
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
    private final Response.ResponseBuilder builder = new Response.ResponseBuilder();

    @Override
    public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
      bodyPart.getBodyByteBuffer(); // Each chunk of bytes will be fed into this method.
                                    // I need to write these bytes to the resuting input stream
                                    // without streaming them all into memory.
      return State.CONTINUE;
    }

    @Override
    public State onHeadersReceived(HttpHeaders headers) throws Exception {
      builder.accumulate(headers);
      return State.CONTINUE;
    }

    @Override
    public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
      builder.accumulate(responseStatus);
      return State.CONTINUE;
    }

    @Override
    public Response onCompleted() throws Exception {
      return builder.build();
    }

    @Override
    public void onThrowable(Throwable t) {

    }
  }).get();

什么是获取输入流中字节的最简单、最干净的方法?
我有两个想法:
1)将输入写入文件,然后流式传输文件 或者 2)立即返回一个管道输入流,字节将在接收到它们时被写入管道输入流。
是否有人可以分享一个可用的示例?
1个回答

4

我正确地假设有人已经完成了这个任务。实际上,在我搜索“异步http客户端”和“管道输入流”后,我在项目本身中找到了这个:

https://github.com/AsyncHttpClient/async-http-client/blob/master/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java

用法:

  PipedInputStream pipedInputStream = new PipedInputStream();
  PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
  BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pipedOutputStream);
  Future<Response> futureResponse = asyncHttpClient.prepareGet(url).execute(bodyDeferringAsyncHandler);
  Response response = bodyDeferringAsyncHandler.getResponse();
  if (response.getStatusCode() == 200) {
    return new BodyDeferringAsyncHandler.BodyDeferringInputStream(futureResponse,
        bodyDeferringAsyncHandler,
        pipedInputStream);
  } else {
    return null;
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接