读取输入流两次

164
如何两次读取相同的输入流?是否有可能复制它? 我需要从网络获取图像,将其保存到本地,然后返回保存的图像。我只是认为使用相同的流比启动新流到下载内容然后再次读取它更快。

2
也许可以使用标记和重置。 - Vyacheslav Shylkin
11个回答

151

您可以使用org.apache.commons.io.IOUtils.copy将InputStream的内容复制到一个字节数组中,然后再使用ByteArrayInputStream从字节数组中反复读取。例如:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();

// either
while (needToReadAgain) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    yourReadMethodHere(bais);
}

// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
    bais.reset();
    yourReadMethodHere(bais);
}

3
@Paul Grime:IOUtils.toByeArray 内部也会调用 copy 方法。 - Ankit
5
正如@Ankit所说,这个解决方案对我无效,因为输入是在内部读取的,不能被重复使用。 - Aritz
57
我知道这条评论有点晚,但是在第一个选项中,如果你将输入流读取为字节数组,那么这是否意味着你正在将所有数据加载到内存中?如果你要加载像大文件之类的东西,这可能会成为一个大问题。 - jaxkodex
2
@jaxkodex,是的,那是正确的。如果作为开发人员,您更了解正在处理的流的实际类型,则可以编写更适当的自定义行为。提供的答案是一个一般性的抽象。 - Paul Grime
4
可以使用IOUtils.toByteArray(InputStream)方法一次性获取字节数组。 - akd
显示剩余2条评论

40

根据InputStream的来源,您可能无法重置它。您可以使用markSupported()检查是否支持mark()reset()

如果支持,您可以调用reset()在InputStream上返回到开头。如果不支持,则需要再次从源中读取InputStream。


1
InputStream不支持'mark' - 你可以在IS上调用mark,但它什么也不做。同样,调用IS上的reset将抛出异常。 - ayahuasca
9
InputStream 的子类,比如 BufferedInputStream,支持 'mark'。 - Dzmitry Bahdanovich

18

如果你的 InputStream 支持使用 mark,那么你可以为其调用 mark() 方法并随后调用 reset() 方法。如果你的 InputStrem 不支持 mark,则可以使用 java.io.BufferedInputStream 类,将流嵌套在 BufferedInputStream 中,如下所示:

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
    bufferdInputStream.mark(some_value);
    //read your bufferdInputStream 
    bufferdInputStream.reset();
    //read it again

2
一个缓冲输入流只能标记回到缓冲区大小,因此如果源不适合,您无法完全返回到开头。 - L. Blanc
@L.Blanc 抱歉,但那似乎不正确。请查看 BufferedInputStream.fill(),其中有“增加缓冲区”部分,新缓冲区大小仅与 marklimitMAX_BUFFER_SIZE 进行比较。 - eugene82

9
您可以使用PushbackInputStream将输入流进行包装。PushbackInputStream允许“未读”(“写回”)已经读取的字节,因此您可以像这样操作:
public class StreamTest {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's wrap it with PushBackInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    ((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3


  }

  private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
    System.out.print("Reading stream: ");

    byte[] buf = new byte[howManyBytes];

    int next = 0;
    for (int i = 0; i < howManyBytes; i++) {
      next = is.read();
      if (next > 0) {
        buf[i] = (byte) next;
      }
    }
    return buf;
  }

  private static void printBytes(byte[] buffer) throws IOException {
    System.out.print("Reading stream: ");

    for (int i = 0; i < buffer.length; i++) {
      System.out.print(buffer[i] + " ");
    }
    System.out.println();
  }


}

请注意,PushbackInputStream存储字节的内部缓冲区,因此它实际上创建了一个在内存中保存“被写回”的字节的缓冲区。
了解这种方法后,我们可以进一步将其与FilterInputStream结合使用。FilterInputStream将原始输入流存储为代理。这允许创建新的类定义,从而自动“未读取”原始数据。该类的定义如下:
public class TryReadInputStream extends FilterInputStream {
  private final int maxPushbackBufferSize;

  /**
  * Creates a <code>FilterInputStream</code>
  * by assigning the  argument <code>in</code>
  * to the field <code>this.in</code> so as
  * to remember it for later use.
  *
  * @param in the underlying input stream, or <code>null</code> if
  *           this instance is to be created without an underlying stream.
  */
  public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
    super(new PushbackInputStream(in, maxPushbackBufferSize));
    this.maxPushbackBufferSize = maxPushbackBufferSize;
  }

  /**
   * Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
   * in the stream
   *
   * @param buffer the destination buffer to which read the data
   * @param offset  the start offset in the destination <code>buffer</code>
   * @aram length how many bytes to read from the stream to buff. Length needs to be less than
   *        <code>maxPushbackBufferSize</code> or IOException will be thrown
   *
   * @return number of bytes read
   * @throws java.io.IOException in case length is
   */
  public int tryRead(byte[] buffer, int offset, int length) throws IOException {
    validateMaxLength(length);

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int bytesRead = 0;

    int nextByte = 0;

    for (int i = 0; (i < length) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        buffer[offset + bytesRead++] = (byte) nextByte;
      }
    }

    if (bytesRead > 0) {
      ((PushbackInputStream) in).unread(buffer, offset, bytesRead);
    }

    return bytesRead;

  }

  public byte[] tryRead(int maxBytesToRead) throws IOException {
    validateMaxLength(maxBytesToRead);

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int nextByte = 0;

    for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        baos.write((byte) nextByte);
      }
    }

    byte[] buffer = baos.toByteArray();

    if (buffer.length > 0) {
      ((PushbackInputStream) in).unread(buffer, 0, buffer.length);
    }

    return buffer;

  }

  private void validateMaxLength(int length) throws IOException {
    if (length > maxPushbackBufferSize) {
      throw new IOException(
        "Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
        length);
    }
  }

}

这个类有两个方法。一个用于将内容读入现有的缓冲区(与InputStream类的public int read(byte b[], int off, int len)方法类似)。第二个方法则返回新的缓冲区(如果要读取的缓冲区大小未知,则此方法可能更有效)。

现在让我们看看我们的类如何运作:

public class StreamTest2 {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's use our TryReadInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new TryReadInputStream(originalStream, 10);

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
    printBytes(readBytes); // prints 1 2 3

    // we can also call normal read which will actually read the bytes without "writing them back"
    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 4 5 6

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
    printBytes(readBytes); // prints 7 8 9

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 7 8 9


  }



}

9

如何将一个输入流InputStream分成两个,同时避免加载所有数据到内存中,然后独立地处理它们:

  1. Create a couple of OutputStream, precisely: PipedOutputStream
  2. Connect each PipedOutputStream with a PipedInputStream, these PipedInputStream are the returned InputStream.
  3. Connect the sourcing InputStream with just created OutputStream. So, everything read it from the sourcing InputStream, would be written in both OutputStream. There is not need to implement that, because it is done already in TeeInputStream (commons.io).
  4. Within a separated thread read the whole sourcing inputStream, and implicitly the input data is transferred to the target inputStreams.

    public static final List<InputStream> splitInputStream(InputStream input) 
        throws IOException 
    { 
        Objects.requireNonNull(input);      
    
        PipedOutputStream pipedOut01 = new PipedOutputStream();
        PipedOutputStream pipedOut02 = new PipedOutputStream();
    
        List<InputStream> inputStreamList = new ArrayList<>();
        inputStreamList.add(new PipedInputStream(pipedOut01));
        inputStreamList.add(new PipedInputStream(pipedOut02));
    
        TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
    
        TeeInputStream tin = new TeeInputStream(input, tout, true);
    
        Executors.newSingleThreadExecutor().submit(tin::readAllBytes);  
    
        return Collections.unmodifiableList(inputStreamList);
    }
    

使用完流后一定要关闭inputStreams,并关闭运行的线程:TeeInputStream.readAllBytes()

如果需要将其拆分成多个InputStream,而不仅仅是两个,请替换上一个代码片段中的TeeOutputStream类为您自己的实现,该实现将封装一个List<OutputStream>并覆盖OutputStream接口:

public final class TeeListOutputStream extends OutputStream {
    private final List<? extends OutputStream> branchList;

    public TeeListOutputStream(final List<? extends OutputStream> branchList) {
        Objects.requireNonNull(branchList);
        this.branchList = branchList;
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        for (OutputStream branch : branchList) {
            branch.write(b);
        }
    }

    @Override
    public void flush() throws IOException {
        for (OutputStream branch : branchList) {
            branch.flush();
        }
    }

    @Override
    public void close() throws IOException {
        for (OutputStream branch : branchList) {
            branch.close();
        }
    }
}

请问,您能否更详细地解释一下第四步是什么意思? 为什么我们必须手动触发读取?为什么管道输入流的任何读取都不会触发源输入流的读取? 而且为什么我们要异步调用它? - Дмитрий Кулешов
1
为了关闭TeeOutputStream,我在线程中添加了tin.close:Executors.newSingleThreadExecutor().submit(() -> { try { tin.readAllBytes(); tin.close(); } catch (IOException ioException) { ioException.printStackTrace(); } }); - Tobias Otto

7
如果你正在使用实现了InputStream的方法,你可以检查InputStream#markSupported()的结果,以确定是否可以使用mark()/reset()方法。

如果你可以在读取时标记流,那么可以调用reset()返回开始位置。

如果不能,则必须重新打开流。

另一种解决方案是将InputStream转换为字节数组,然后根据需要多次迭代数组。您可以在这篇文章Convert InputStream to byte array in Java中找到几个使用第三方库或不使用第三方库的解决方案。请注意,如果读取的内容太大,可能会遇到一些内存问题。

最后,如果您需要读取图像,请使用:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

使用ImageIO#read(java.net.URL),还可以利用缓存。

1
使用ImageIO#read(java.net.URL)时需要注意:一些Web服务器和CDN可能会拒绝裸调用(即没有用户代理使服务器相信该调用来自Web浏览器)由ImageIO#read发出。在这种情况下,使用URLConnection.openConnection()设置用户代理到该连接+使用ImageIO.read(InputStream)通常可以解决问题。 - Clint Eastwood
InputStream 不是一个接口。 - bric3

4

如果有人正在运行Spring Boot应用程序,并且您想要读取RestTemplate的响应正文(这就是为什么我想要两次读取流),那么有一种更加清晰的方法可以解决这个问题。

首先,您需要使用Spring的StreamUtils将流复制到一个字符串中:

String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))

但这还不够。您还需要使用一个可以为您缓冲流的请求工厂,如下所示:

ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
RestTemplate restTemplate = new RestTemplate(factory);

如果你正在使用工厂Bean,则可以按照以下方式操作(以下示例为Kotlin语言):

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
  .requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
  .additionalInterceptors(loggingInterceptor)
  .build()

来源:https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-without-destroying-the-body/

本文介绍了如何记录RestTemplate请求和响应的内容,同时不破坏消息体。通过创建自定义拦截器,可以在请求发送前和响应返回后记录请求和响应的详细信息。此外,本文还提供了一个示例程序来演示如何实现该功能。

3
怎么样:
if (stream.markSupported() == false) {

        // lets replace the stream object
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        IOUtils.copy(stream, baos);
        stream.close();
        stream = new ByteArrayInputStream(baos.toByteArray());
        // now the stream should support 'mark' and 'reset'

    }

8
这个主意很糟糕。你这样会将整个流的内容都放到内存中。 - Niels Doucet

2

将输入流转换为字节,然后将其传递给savefile函数,在该函数中将其组装成输入流。 另外在原始函数中使用字节来执行其他任务。


6
我认为这个想法不好,生成的数组可能会非常大,从而占用设备的内存。 - Kevin Parker

0
ByteArrayInputStream ins = new ByteArrayInputStream("Hello".getBytes());
System.out.println("ins.available() at begining:: " + ins.available());
ins.mark(0);
// Read input stream for some operations
System.out.println("ins.available() after reading :: " + ins.available());
    ins.reset();
    System.out.println("ins.available() after resetting :: " + ins.available());
    // ins is ready for reading once again.

以上语句的输出为: 开始时ins.available()的值为:1028 读取后ins.available()的值为:0 重置后ins.available()的值为:1028 - user14864114

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接