从OutputStream创建InputStream的最有效方法

93

本页:http://blog.ostermiller.org/convert-java-outputstream-inputstream描述了如何从OutputStream创建一个InputStream:

new ByteArrayInputStream(out.toByteArray())

其他的选择是使用管道流和新线程,但这种方法很繁琐。

我不喜欢将许多兆字节复制到新的内存字节数组中的想法。是否有一个更有效率的库可以实现这一点?

编辑:

在Laurence Gonsalves的建议下,我尝试了使用PipedStreams,并发现它们并不难处理。以下是Clojure示例代码:

(defn #^PipedInputStream create-pdf-stream [pdf-info]
  (let [in-stream (new PipedInputStream)
        out-stream (PipedOutputStream. in-stream)]
    (.start (Thread. #(;Here you write into out-stream)))
    in-stream))
5个回答

77
如果您不想一次性将所有数据复制到内存缓冲区中,那么必须让使用OutputStream的代码(生产者)和使用InputStream的代码(消费者)在同一线程中交替运行,或者在两个单独的线程中并发运行。让它们在同一线程中操作可能比使用两个单独的线程要复杂得多,容易出错(您需要确保消费者永远不会阻塞等待输入,否则您将会遇到死锁),而且需要在同一循环中运行生产者和消费者,这似乎耦合度太高了。
所以请使用第二个线程。这真的不是很复杂。您链接的页面中有一个合理的示例。下面是一个稍微现代化的版本,也关闭了流:
try (PipedInputStream in = new PipedInputStream()) {
    new Thread(() -> {
        try (PipedOutputStream out = new PipedOutputStream(in)) {
            writeDataToOutputStream(out);
        } catch (IOException iox) {
            // handle IOExceptions
        }
    }).start();
    processDataFromInputStream(in);
}

8
Stephen说:在内容被写出来之前,你无法阅读。因此,只使用一个线程时,你需要先将所有内容都写出来(这会创建一个大的内存数组,而Vagif想要避免这种情况),或者让它们交替进行,非常小心地确保阅读者永远不会阻塞等待输入(因为如果这样做了,编写者也永远无法执行)。 - Laurence Gonsalves
@Laurence - 尽管看着你的声望好像不需要,但还是给你点赞!只是想说这个解释得非常好!(包括回答和评论) - Amol Katdare
1
这个建议在 JEE 环境中使用安全吗?因为容器可能正在运行很多自己的线程。 - Toskan
3
如果因为某种原因在您的容器中使用new Thread不合适,那么请查看是否有可用的线程池可以使用。 - Laurence Gonsalves
1
@LaurenceGonsalves,如果您不关闭PipedOutputStream,则读取器将无限期地阻塞,或者如果写入线程已停止,则会抛出异常,请参见https://dev59.com/bW035IYBdhLWcg3wYe8F#29725367。还应关闭PipedInputStream以确保如果(由于某种原因)编写器再次写入PipedOutputStream,则在缓冲区已满且等待空间时不会无限期地阻塞。 - Marcono1234
显示剩余8条评论

15

还有另一个名为EasyStream的开源库,以透明的方式处理管道和线程。如果一切顺利,这并不是很复杂。当问题出现时(参考Laurence Gonsalves的示例),情况就会变得复杂。

class1.putDataOnOutputStream(out);

抛出异常。在该示例中,线程仅完成且异常丢失,而外部的 InputStream 可能会被截断。

Easystream处理异常传播和其他我已经调试了一年左右的麻烦问题。(我是库的维护者:显然我的解决方案是最好的;)) 以下是如何使用它的示例:

final InputStreamFromOutputStream<String> isos = new InputStreamFromOutputStream<String>(){
 @Override
 public String produce(final OutputStream dataSink) throws Exception {
   /*
    * call your application function who produces the data here
    * WARNING: we're in another thread here, so this method shouldn't 
    * write any class field or make assumptions on the state of the outer class. 
    */
   return produceMydata(dataSink)
 }
};

还有一个很好的介绍,解释了所有将OutputStream转换为InputStream的其他方法。值得一看。


1
使用他们的类的教程可在https://code.google.com/p/io-tools/wiki/Tutorial_EasyStream找到。 - koppor

13

避免复制缓冲区的简单解决方案是创建一个专用的ByteArrayOutputStream

public class CopyStream extends ByteArrayOutputStream {
    public CopyStream(int size) { super(size); }

    /**
     * Get an input stream based on the contents of this output stream.
     * Do not use the output stream after calling this method.
     * @return an {@link InputStream}
     */
    public InputStream toInputStream() {
        return new ByteArrayInputStream(this.buf, 0, this.count);
    }
}

按需向上述输出流写入内容,然后调用 toInputStream 方法以获取基础缓冲区上的输入流。在那一点之后,请将输出流视为关闭。


1
请注意:如果数据在没有任何阻塞操作的情况下生成(避免线程开销),则此方法更有效。否则,使用PipedStreams将更加高效。 - morgwai

7

我认为将InputStream连接到OutputStream的最佳方式是通过java.io包中提供的管道流,具体如下:

// 1- Define stream buffer
private static final int PIPE_BUFFER = 2048;

// 2 -Create PipedInputStream with the buffer
public PipedInputStream inPipe = new PipedInputStream(PIPE_BUFFER);

// 3 -Create PipedOutputStream and bound it to the PipedInputStream object
public PipedOutputStream outPipe = new PipedOutputStream(inPipe);

// 4- PipedOutputStream is an OutputStream, So you can write data to it
// in any way suitable to your data. for example:
while (Condition) {
     outPipe.write(mByte);
}

/*Congratulations:D. Step 4 will write data to the PipedOutputStream
which is bound to the PipedInputStream so after filling the buffer
this data is available in the inPipe Object. Start reading it to
clear the buffer to be filled again by the PipedInputStream object.*/

在我看来,这段代码有两个主要优点:

1 - 除了缓冲区之外,没有额外的内存消耗。

2 - 无需手动处理数据排队。


6
这将是很棒的,但是javadocs指出,如果您在同一个线程中读写这些管道,可能会导致死锁。希望他们能使用NIO进行更新! - Nate Glenn

2

我通常会尽量避免创建单独的线程,因为这会增加死锁的机会、增加理解代码的难度以及处理异常的问题。

以下是我的解决方案:使用一个名为ProducerInputStream的流,通过反复调用produceChunk()方法来创建内容块:

public abstract class ProducerInputStream extends InputStream {

    private ByteArrayInputStream bin = new ByteArrayInputStream(new byte[0]);
    private ByteArrayOutputStream bout = new ByteArrayOutputStream();

    @Override
    public int read() throws IOException {
        int result = bin.read();
        while ((result == -1) && newChunk()) {
            result = bin.read();
        }
        return result;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        int result = bin.read(b, off, len);
        while ((result == -1) && newChunk()) {
            result = bin.read(b, off, len);
        }
        return result;
    }

    private boolean newChunk() {
        bout.reset();
        produceChunk(bout);
        bin = new ByteArrayInputStream(bout.toByteArray());
        return (bout.size() > 0);
    }

    public abstract void produceChunk(OutputStream out);

}

有趣的想法,但遗憾的是,只有在您控制生成数据的代码时才能起作用。如果另一个第三方库向OutputStream写入GB级别的数据而不返回控制,则最好将所有内容复制到内存中,这违背了此类的初衷。 - Jasper Citi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接