使用RestTemplate获取InputStream

47

我正在使用URL类从中读取InputStream。是否有任何方法可以使用RestTemplate来实现这个功能?

InputStream input = new URL(url).openStream();
JsonReader reader = new JsonReader(new InputStreamReader(input, StandardCharsets.UTF_8.displayName())); 

如何使用RestTemplate获取InputStream而不是使用URL

9个回答

58
之前的回答并没有错,但不够深入。在处理低级别的InputStream时,有时不仅是可取的,而且是必要的,最常见的例子是从源头(如web服务器)流式传输大文件到目标地(如数据库)。如果你尝试使用ByteArrayInputStream,你将会遇到预期之外的OutOfMemoryError。是的,你可以编写自己的HTTP客户端代码,但是你将不得不处理错误响应代码、响应转换器等问题。如果你已经使用了Spring,那么寻找RestTemplate就是一个自然的选择。
截至本文撰写时,spring-web:5.0.2.RELEASE拥有一个ResourceHttpMessageConverter,它有一个boolean supportsReadStreaming,如果设置为true,并且响应类型为InputStreamResource,则返回InputStreamResource;否则返回ByteArrayResource。因此,很明显,你不是唯一一个需要流支持的人。
然而,存在一个问题:RestTemplateHttpMessageConverter运行后很快就关闭了响应。因此,即使你请求了InputStreamResource并收到了它,但它没有用,因为响应流已经关闭了。我认为这是一个设计缺陷,他们忽视了这一点;它应该依赖于响应类型。因此,很遗憾,对于读取,你必须完全消耗响应;如果使用RestTemplate,则不能传递它。
但是写入没有问题。如果你想要流式传输InputStreamResourceHttpMessageConverter将为你完成。在幕后,它使用org.springframework.util.StreamUtils每次从InputStream中写入4096字节到OutputStream中。

一些HttpMessageConverter支持所有媒体类型,因此根据您的要求,您可能需要从RestTemplate中删除默认值,并设置您需要的值,谨慎考虑它们的相对顺序。

最后,如果您正在上传大量数据流,则ClientHttpRequestFactory的实现具有boolean bufferRequestBody,您可以和应该将其设置为false。否则,会出现OutOfMemoryError错误。截至本文撰写时,SimpleClientHttpRequestFactory(JDK客户端)和HttpComponentsClientHttpRequestFactory(Apache HTTP客户端)支持此功能,但OkHttp3ClientHttpRequestFactory不支持。同样是设计上的疏忽。

编辑:已提交工单SPR-16885


3
@Kieveli,我不确定你所说的“没有例子”是什么意思。我提供了实际应用案例的参考,比如流媒体。本答案的目标并不是要提供编码上的指导。 - Abhijit Sarkar
不确定这是否是设计缺陷。我认为这只是设计决策的结果,即让RestTemplate在调用中管理资源。当我尝试使用在JdbcTemplate中检索的InputStream时,我遇到了同样的问题。它已经关闭了结果集,因此关闭了CLOB字段的输入流。但是解释得很好。 - David Bradley
只有在通过POST或PUT发送文件时才应考虑使用“bufferRequestBody”。如果我们谈论从HTTP响应中读取InputStream,则可以忽略它。 - valijon

49

Spring有一个org.springframework.http.converter.ResourceHttpMessageConverter,它可以将Spring的org.springframework.core.io.Resource类转换为其他格式。

Resource类封装了一个InputStream,可以通过someResource.getInputStream()获取。

把这些组合在一起,您可以通过将Resource.class指定为RestTemplate调用的响应类型,从RestTemplate开箱即用地获取InputStream

以下是使用RestTemplateexchange(..)方法之一的示例:

import org.springframework.web.client.RestTemplate;
import org.springframework.http.HttpMethod;
import org.springframework.core.io.Resource;

ResponseEntity<Resource> responseEntity = restTemplate.exchange( someUrlString, HttpMethod.GET, someHttpEntity, Resource.class );

InputStream responseInputStream;
try {
    responseInputStream = responseEntity.getBody().getInputStream();
}
catch (IOException e) {
    throw new RuntimeException(e);
}

// use responseInputStream

4
responseEntity.getBody().getInputStream();是不正确的。没有getInputStream方法。 - brain storm
1
@brainstorm,比 spring-core-3.1.1.RELEASE 版本旧和新的版本都有一个 org.springframework.core.io.Resource,它继承了 org.springframework.core.io.InputStreamSource 并提供了 getInputStream() - Abdull
30
底层输入流将是ByteArrayInputStream,这意味着响应体将被加载到内存中。 - Prof Mo
1
它对我没有用。很明显,它将所有内容加载到内存中。因此,上面的响应是不正确的。 - Jurass

25
您不应直接获取InputStreamRestTemplate旨在封装响应(和请求)内容的处理。它的优势在于处理所有IO并将一个准备就绪的Java对象交给您。 RestTemplate的原始作者之一Brian Clozel已经表明

RestTemplate不适用于流式响应体; 它的合同不允许它,并且它已经存在了很长时间,改变其行为的这个基本部分会干扰许多应用程序。

您需要注册适当的HttpMessageConverter对象。这些对象将通过HttpInputMessage对象访问响应的InputStream

正如Abdull所建议的那样, Spring自带了一个HttpMessageConverter实现Resource,它本身包装了一个InputStreamResourceHttpMessageConverter。它不支持所有的Resource类型,但既然你应该按照接口编程,那么你应该只使用超级接口Resource

当前实现(4.3.5)将返回一个ByteArrayResource,其中包含响应流的内容复制到一个新的ByteArrayInputStream中,您可以访问它。

你不需要关闭流,RestTemplate会为你处理。如果你尝试使用InputStreamResource(另一种被ResourceHttpMessageConverter支持的类型),这就很不幸了,因为它包装了底层响应的InputStream但在暴露给客户端代码之前就已经关闭了。

6
您不应直接获取InputStream。 RestTemplate旨在封装响应(和请求)内容的处理。它的优势在于处理所有IO并提供一个可直接使用的Java对象。这完全是错误的。在某些情况下,序列化对象比堆能够容纳的更大,因此您必须流式传输它的序列化而不是将其转换为字符串或任何其他序列化介质,然后再将其写入输出流缓冲区中。 - Dragas
2
@Dragas 在 InputStream 的一般情况下,你说得没错。然而,RestTemplate 并不支持这种流式使用情况。你可以在其中一个原始作者(Brian Clozel)的评论中看到这一点这里RestTemplate 不是用来流式传输响应体的; 它的契约不允许它,而且它已经存在了很长时间,改变其行为的这个基本部分是不可能的,会破坏许多应用程序。 - Sotirios Delimanolis

8

请问您能否在GitHub上的存储库中添加许可证?我想在我的项目中使用它,但是根据这个链接,我不能这样做:https://opensource.stackexchange.com/questions/1720/what-can-i-assume-if-a-publicly-published-project-has-no-license - afrish
1
当然,@afrish。由于我从未这样做过,您能告诉我需要添加什么吗?我上传它的目的是让任何需要它的人都可以使用它。很高兴看到您发现它有用。 - ItamarBe
我认为你可以按照这个链接 https://docs.github.com/en/communities/setting-up-your-project-for-healthy-contributions/adding-a-license-to-a-repository 来选择MIT或Apache许可证。 - afrish

6

一种非常简单但有效的解决方案是使用ResponseExtractor。当您想要在非常大的InputStream上进行操作并且内存有限时,这种方法尤其有用。

以下是实现方法:

public void consumerInputStreamWithoutBuffering(String url, Consumer<InputStream> streamConsumer) throws IOException {

    final ResponseExtractor responseExtractor =
            (ClientHttpResponse clientHttpResponse) -> {
                streamConsumer.accept(clientHttpResponse.getBody());
                return null;
            };

    restTemplate.execute(url, HttpMethod.GET, null, responseExtractor);
}

然后,在需要的任何地方调用该方法:

Consumer<InputStream> doWhileDownloading = inputStream -> {
                //Use inputStream for your business logic...
};

consumerInputStreamWithoutBuffering("https://localhost.com/download", doWhileDownloading);

请注意以下常见陷阱:
public InputStream getInputStreamFromResponse(String url) throws IOException {

    final ResponseExtractor<InputStream> responseExtractor =
            clientHttpResponse -> clientHttpResponse.getBody();

    return restTemplate.execute(url, HttpMethod.GET, null, responseExtractor);
}

在您访问它之前,InputStream 将会被关闭。


5

我通过这种方法解决了问题,希望它能帮助到大家。

    @GetMapping("largeFile")
    public ResponseEntity<InputStreamResource> downloadLargeFile(
            @RequestParam("fileName") String fileName
    ) throws IOException {

        RestTemplate restTemplate = new RestTemplate();

        // Optional Accept header
        RequestCallback requestCallback = request -> request.getHeaders()
                .setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));

        // Streams the response instead of loading it all in memory
        ResponseExtractor<InputStreamResource> responseExtractor = response -> {
            // Here I write the response to a file but do what you like
            Path path = Paths.get("tmp/" + fileName);
            Files.copy(response.getBody(), path, StandardCopyOption.REPLACE_EXISTING);
            return new InputStreamResource(new FileInputStream(String.format("tmp/%s", fileName)));
        };

        InputStreamResource response = restTemplate.execute(
            String.format("http://%s:%s/file/largeFileRestTemplate?fileName=%s", host, "9091", fileName),
            HttpMethod.GET,
            requestCallback,
            responseExtractor
        );

        return ResponseEntity
            .ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=%s", fileName))
            .body(response);
    }

1
当请求体由消息转换器处理时,它最终将成为ByteArrayInputStream并在内存中加载。您的示例并未解决核心问题,即在创建请求时未加载内存,这是一个好的部分。 - DV Singh
@DVSingh,感谢分享。我并不是说你的评论有错。我已经使用并对我发布的代码进行了充分测试。它实际上解决了问题。 - Spring
4
它被加载到内存中,但这并不意味着整个容量都被加载。它通过数据流逐字节逐块地加载。 - Spring

4
感谢 Abhijit Sarkar 的回答指引。
我需要下载一个庞大的 JSON 流并将其分成易于处理的小数据流。该 JSON 由具有大属性的对象组成:这些大属性可以序列化到文件中,从而从未解组的 JSON 对象中删除。
另一个用例是逐个对象下载 JSON 流,像 map/reduce 算法一样对其进行处理并生成单个输出,而无需加载整个流到内存中。
还有另一个用例是读取一个大型 JSON 文件,并根据条件仅选择少量对象,同时解组为普通的 Java 对象。
下面是一个示例:我们想要流式传输一个非常巨大的 JSON 文件,它是一个数组,并且我们只想检索数组中的第一个对象。
给定位于服务器上的此大文件,可在http://example.org/testings.json获取:
[
    { "property1": "value1", "property2": "value2", "property3": "value3" },
    { "property1": "value1", "property2": "value2", "property3": "value3" },
    ... 1446481 objects => a file of 104 MB => take quite long to download...
]

这个JSON数组的每一行都可以被解析为这个对象:

@lombok.Data
public class Testing {
    String property1;
    String property2;
    String property3;
}

你需要这个类来使解析代码可重用:
import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
@FunctionalInterface
public interface JsonStreamer<R> {
    /**
     * Parse the given JSON stream, process it, and optionally return an object.<br>
     * The returned object can represent a downsized parsed version of the stream, or the result of a map/reduce processing, or null...
     *
     * @param jsonParser the parser to use while streaming JSON for processing
     * @return the optional result of the process (can be {@link Void} if processing returns nothing)
     * @throws IOException on streaming problem (you are also strongly encouraged to throw HttpMessageNotReadableException on parsing error)
     */
    R stream(JsonParser jsonParser) throws IOException;
}

并且这个类用于解析:
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import lombok.AllArgsConstructor;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

@AllArgsConstructor
public class StreamingHttpMessageConverter<R> implements HttpMessageConverter<R> {

    private final JsonFactory factory;
    private final JsonStreamer<R> jsonStreamer;

    @Override
    public boolean canRead(Class<?> clazz, MediaType mediaType) {
        return MediaType.APPLICATION_JSON.isCompatibleWith(mediaType);
    }

    @Override
    public boolean canWrite(Class<?> clazz, MediaType mediaType) {
        return false; // We only support reading from an InputStream
    }

    @Override
    public List<MediaType> getSupportedMediaTypes() {
        return Collections.singletonList(MediaType.APPLICATION_JSON);
    }

    @Override
    public R read(Class<? extends R> clazz, HttpInputMessage inputMessage) throws IOException {
        try (InputStream inputStream = inputMessage.getBody();
             JsonParser parser = factory.createParser(inputStream)) {
            return jsonStreamer.stream(parser);
        }
    }

    @Override
    public void write(R result, MediaType contentType, HttpOutputMessage outputMessage) {
        throw new UnsupportedOperationException();
    }

}

接下来是用于流式HTTP响应、解析JSON数组并返回仅第一个未解组对象的代码:

// You should @Autowire these:
JsonFactory jsonFactory = new JsonFactory();
ObjectMapper objectMapper = new ObjectMapper();
RestTemplateBuilder restTemplateBuilder = new RestTemplateBuilder();

// If detectRequestFactory true (default): HttpComponentsClientHttpRequestFactory will be used and it will consume the entire HTTP response, even if we close the stream early
// If detectRequestFactory false: SimpleClientHttpRequestFactory will be used and it will close the connection as soon as we ask it to

RestTemplate restTemplate = restTemplateBuilder.detectRequestFactory(false).messageConverters(
    new StreamingHttpMessageConverter<>(jsonFactory, jsonParser -> {

        // While you use a low-level JsonParser to not load everything in memory at once,
        // you can still profit from smaller object mapping with the ObjectMapper
        if (!jsonParser.isClosed() && jsonParser.nextToken() == JsonToken.START_ARRAY) {
            if (!jsonParser.isClosed() && jsonParser.nextToken() == JsonToken.START_OBJECT) {
                return objectMapper.readValue(jsonParser, Testing.class);
            }
        }
        return null;

    })
).build();

final Testing firstTesting = restTemplate.getForObject("http://example.org/testings.json", Testing.class);
log.debug("First testing object: {}", firstTesting);

1
你确定这段代码能够处理整个文件吗?一旦你退出 try 块,响应流就会关闭,因此看起来你会得到一个异常。 - Abhijit Sarkar
这就是为什么整个解析在 try 块结束前完成。创建新的 StreamingHttpMessageConverter<>(jsonFactory, jsonStreamer) 时,作为第二个参数传递的 JsonStreamer lambda 将在 read(...) 方法中使用:整个 JsonStreamer.stream(...) 方法/lambda 将在 try 块内调用,并且流处于打开状态。是的,一旦我们结束 try/read(),流就关闭了(当 detectRequestFactory=false 时)。 - Sebien
我明白了,我没有注意到你正在返回 R,所以显然你必须完全消耗流并反序列化为 R - Abhijit Sarkar
如何遍历所有对象 - user1862354

4
你可以传入自己的响应提取器。以下是一个示例,我将json以流式方式写到磁盘上 -
        RestTemplate restTemplate = new RestTemplateBuilder().basicAuthentication("user", "their_password" ).build();

        int responseSize = restTemplate.execute(uri,
            HttpMethod.POST,
            (ClientHttpRequest requestCallback) -> {
                requestCallback.getHeaders().setContentType(MediaType.APPLICATION_JSON);
                requestCallback.getBody().write(body.getBytes());
            },
            responseExtractor -> {
                FileOutputStream fos  = new FileOutputStream(new File("out.json"));
                return StreamUtils.copy(responseExtractor.getBody(), fos);
            }
    )

1
这实际上运行得非常好。如果您正在尝试从一个服务流式传输响应到另一个服务,您可以将此流复制到您的HttpServletResponse中,或者像您提到的那样将其复制到文件中,然后将其作为本地文件处理。 - rpgFANATIC

-1
作为一种选择,您可以将响应作为字节消耗,然后转换为流。
byte data[] = restTemplate.execute(link, HttpMethod.GET, null, new BinaryFileExtractor());
return new ByteArrayInputStream(data);

Extractor 是

public class BinaryFileExtractor implements ResponseExtractor<byte[]> {

  @Override
  public byte[] extractData(ClientHttpResponse response) throws IOException {
    return ByteStreams.toByteArray(response.getBody());
  }
}

4
这正是他们试图避免的。如果你将其读入为 byte[],就会将整个文件加载到内存中,不是流式处理。如果在将整个文件加载到 RAM 后再进行流式处理,那么你就同时拥有了最糟糕的两种选择。 - BrianC

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接