使用ResponseEntity进行流传输的正确方式,确保InputStream被关闭。

31

我们的一个应用程序泄露了文件句柄,但我们尚未找到原因。

在代码中,我看到有几个类似于以下函数的函数:

public ResponseEntity<InputStreamResource> getFoo( ... ) {
    InputStream content = getContent(...)
    InputStreamResource isr = new InputStreamResource(content);
    return ResponseEntity.status(HttpServletResponse.SC_OK).body(isr);
}

(由于简洁起见,省略了if检查和try/catch)

我确信这一段代码引起了问题,因为当我使用JMeter对特定代码进行负载测试时,我能看到在此阶段getContent()失败了:

is = Files.newInputStream(f.toPath());

通常我会关闭InputStream,但由于这段简短的代码,我无法在returnbody调用之前关闭流。

当我运行lsof(该代码在Linux上运行)时,我可以看到有数千个文件以读模式打开。因此,我确定这个问题是由于流没有被关闭引起的。

是否有最佳实践代码我应该交换?


只是猜测,您能检查泄漏是否发生在HEAD请求或GET请求中吗? - reith
据我所见,这仅会在使用GET时发生,而不会在使用HEAD时发生。 - Marged
好的,我的猜测是由于隐式 HEAD 不会消耗正文,可能会导致泄漏。但事实并非如此。 - reith
4个回答

58

您可以尝试使用StreamingResponseBody。

StreamingResponseBody

这是一种控制器方法返回值类型,用于异步请求处理,应用程序可以直接写入响应OutputStream而不会阻塞Servlet容器线程。

由于您正在单独的线程上工作,并直接向响应写入数据,因此在return之前调用close()方法的问题已得到解决。

您可以从以下示例开始尝试:

public ResponseEntity<StreamingResponseBody> export(...) throws FileNotFoundException {
    //...

    InputStream inputStream = new FileInputStream(new File("/path/to/example/file"));


    StreamingResponseBody responseBody = outputStream -> {

        int numberOfBytesToWrite;
        byte[] data = new byte[1024];
        while ((numberOfBytesToWrite = inputStream.read(data, 0, data.length)) != -1) {
            System.out.println("Writing some bytes..");
            outputStream.write(data, 0, numberOfBytesToWrite);
        }

        inputStream.close();
    };

    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=generic_file_name.bin")
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(responseBody);
}

您也可以尝试使用Files(自Java 7以来)

这样您就不必管理InputStream

    File file = new File("/path/to/example/file");

    StreamingResponseBody responseBody = outputStream -> {
        Files.copy(file.toPath(), outputStream);
    };

正如@Stackee007在评论中所描述的,在生产环境下在重负载下,定义一个@Configuration类用于调整参数和管理Async进程是一个好的实践。

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private final TaskExecutionProperties taskExecutionProperties;

    public AsyncConfiguration(TaskExecutionProperties taskExecutionProperties) {
        this.taskExecutionProperties = taskExecutionProperties;
    }

    //  ---------------> Tune parameters here
    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutionProperties.getPool().getCoreSize());
        executor.setMaxPoolSize(taskExecutionProperties.getPool().getMaxSize());
        executor.setQueueCapacity(taskExecutionProperties.getPool().getQueueCapacity());
        executor.setThreadNamePrefix(taskExecutionProperties.getThreadNamePrefix());
        return executor;
    }
    
    //  ---------------> Use this task executor also for async rest methods
    @Bean
    protected WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurer() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setTaskExecutor(getTaskExecutor());
            }
        };
    }

    @Bean
    protected ConcurrentTaskExecutor getTaskExecutor() {
        return new ConcurrentTaskExecutor(this.getAsyncExecutor());
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

如何使用mockMvc进行测试

您可以在集成测试中按照以下示例代码进行操作:

    .andExpect(request().asyncStarted())
    .andDo(MvcResult::getAsyncResult)
    .andExpect(status().isOk()).getResponse().getContentAsByteArray();

ResponseEntity<StreamingResponseBody> 的内容类型在这个例子中是 MediaType.APPLICATION_OCTET_STREAM,你可以通过调用 .getContentAsByteArray() 方法获取字节数组,但是根据响应体的内容类型不同,你也可以获取到 String/Json/纯文本等其它格式的内容。


这看起来很有前途。你想通过添加更多信息来加强它吗? - Marged
请注意使用StreamingBodyResponse时的注意事项:“注意:当使用此选项时,强烈建议显式配置Spring MVC中用于执行异步请求的TaskExecutor。MVC Java配置和MVC命名空间都提供了配置异步处理的选项。如果不使用这些选项,应用程序可以设置RequestMappingHandlerAdapter的taskExecutor属性。” - Stackee007
1
我已经编写了一个使用mockMvc集成测试的小例子。 - ValerioMC
是否可以在Jdk 1.6中使用StreamingBodyResponse?我有一个需要使用异步响应流的旧项目。 - fiddle
1
andDo(MvcResult::getAsyncResult) 存在一个小陷阱 - 如果 StreamingResponseBody lambda 抛出了 Exception/Throwable,集成测试将会忽略这个异常。以下代码更长,但如果有问题则会导致测试失败:.andDo(r -> { var result = r.getAsyncResult(TIMEOUT_MS); if (result instanceof Throwable e) { throw new RuntimeException(e); } }) - seanf
显示剩余3条评论

7
假设您正在使用Spring,您的方法可以返回Resource,然后让Spring处理其余部分(包括关闭底层流)。在Spring API中有少量资源实现可用,否则您需要自己实现。最终,您的方法将变得简单,类似于以下内容。
public ResponseEntity<Resource> getFo0(...) {
    return new InputStreamResource(<Your input stream>);
}

ResponseEntity 应该使用 InputStreamResource,而不是 Resource,对吗? - cogitoboy

4
你可以重构所有读取本地文件并将其内容设置为HTTP响应主体的控制器方法:
不使用ResponseEntity的方法,而是注入底层的HttpServletResponse,并将从getContent(...)方法返回的输入流的字节复制到HttpServletResponse的输出流中,例如使用Apache CommonsIO或Google Guava库的IO相关实用程序方法。无论如何,请确保关闭输入流!下面的代码通过使用“try-with-resources”语句隐式执行此操作,该语句在语句的末尾关闭声明的输入流。
@RequestMapping(value="/foo", method=RequestMethod.GET)
public void getFoo(HttpServletResponse response) {
    // use Java7+ try-with-resources
    try (InputStream content = getContent(...)) {

        // if needed set content type and attachment header
        response.addHeader("Content-disposition", "attachment;filename=foo.txt");
        response.setContentType("txt/plain");

        // copy content stream to the HttpServletResponse's output stream
        IOUtils.copy(myStream, response.getOutputStream());

        response.flushBuffer();
    }
}

参考文献:

https://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html https://google.github.io/guava/releases/19.0/api/docs/com/google/common/io/ByteStreams.html https://commons.apache.org/proper/commons-io/javadocs/api-release/index.html

(特别是查看org.apache.commons.io.IOUtils类的public static int copy(InputStream input, OutputStream output) throws IOExceptionpublic static int copyLarge(InputStream input, OutputStream output) throws IOException方法)


1
我更喜欢@Marged的方法(如果它没有我不知道的任何警告),因为它更简洁,而且在所有端点中使用ReponseEntity可能更加一致。尽管如此,在几个应用程序中我们一直在使用Tommy的方法,而且它运行得非常顺畅。 - Jan B.
1
Spring的FileCopyUtils同样可以使用 - 它会为您关闭流,因此与Apache CommonsIO或Google Guava库的IO相关实用程序方法相比,您需要记住的事情要少一些。 - Krzysztof Skrzynecki
我理解 StreamingResponseBody 相对于使用 HttpServletResponse#getOutputStream 的主要优势是可以启动 TaskExecutor,因此写入流数据不会阻塞 Servlet 容器线程。我只是想了解这两者之间的区别。 - Krzysztof Skrzynecki

3

因为这个InputStream基本上来自一个简单的文件,一个很好的替代方案是这段代码:

FileSystemResource fsr = new FileSystemResource(fileName);
return ResponseEntity.status(HttpServletResponse.SC_OK).body(fsr);

FileSystemResource 可以接收java.util.Filejava.nio.file.Path或指向相关文件的 String


这个处理资源关闭和不同编码(如ISO Latin 1)吗? - NiharGht

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接