使用Java InputStream多次读取大量数据

3
我希望了解在Java中从InputStream多次读取字节的最佳方法,并且当流非常大时仍然高效。 假设我有以下代码:
public void handleBytes(InputStream in) {
    doStuff1(in);
    doStuff2(in);
    doStuff3(in);
}

这里有三个函数doStuff1doStuff2doStuff3,需要对同一组字节进行操作但是功能不同。另外我假设这些函数可以异步执行。

我知道可以使用markreset来处理流,但我想知道当in包含大量数据时,是否适用这种方式。而且如果我想为每个doStuff-X方法设置一个线程工作器,就不能使用reset了。

那我应该为每个doStuff-X方法复制一份流吗?但是我也不确定对于大量数据这是否会有效率。


你需要提供更多关于 doStuff 方法的信息。 - Raedwald
如果输入流不太大,那么只需将字节加载到内存中(如果可以),然后对主字节集运行各种进程。 - ManoDestra
4个回答

1
你可以采用PipedOutputStream和PipedInputStream。
static class Task extends Thread{
    private final String taskName;
    private final BufferedInputStream input;
    public Task(String taskName, PipedInputStream input){
        this.taskName = taskName;
        this.input = new BufferedInputStream( input);
    }

    public void run(){
        try {
            System.out.println("Thread "+this.taskName+" Start");

            final byte buf[] = new byte[8]; // 8 bytes for demo
            while(true){
                if( input.available() > 0){
                    input.read(buf);
                    System.out.println(String.format("Task Name %s, read:%s", this.taskName, new String(buf)));
                }
                else{
                    // TODO: Set break Condition:Ex: Check the expected read size
                    Thread.sleep(1000);
                }
            }
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
public static void main(String args[]) {
    try{
        final PipedInputStream input1 = new PipedInputStream();
        final PipedInputStream input2 = new PipedInputStream();
        final PipedInputStream input3 = new PipedInputStream();

        final Task t1 = new Task("Task1", input1);
        final Task t2 = new Task("Task2", input2);
        final Task t3 = new Task("Task3", input3);
        t1.start();
        t2.start();
        t3.start();

        Thread.sleep(300);

        InputStream input = null;
        try{
            input = new FileInputStream("LargeInputFile.txt");

            final PipedOutputStream out1 = new PipedOutputStream(input1);
            final PipedOutputStream out2 = new PipedOutputStream(input2);
            final PipedOutputStream out3 = new PipedOutputStream(input3);

            byte buf[] = new byte[8]; // 8 bytes for demo
            while(true){

                if(input.available()>0){
                    int size = input.read(buf);

                    if(size > 0){
                        out1.write(buf);
                        out2.write(buf);
                        out3.write(buf);
                        out1.flush();
                        out2.flush();
                        out3.flush();
                    }                       
                }
                else{
                    System.out.println("Rread is finished!");
                    break;
                }
            }
        }
        finally{
            if(input!=null){
                input.close();
            }
        }   
        t1.join();
        t2.join();
        t3.join();
    }
    catch(Exception e){
        e.printStackTrace(System.err);
    }
}

1

如果没有对整个输入进行缓冲,只能一次读取InputStream。

如果它是GB或更大的文件,则可以将其加载到内存中,或者将其复制到文件并在需要时重放。如果您可以在一个线程中解析数据,则可以将其传递给其他线程。


1
将它复制到文件似乎是一个非常简单的方法。但是在我可以在doStuffs方法中处理它们之前,我需要先将所有字节写入文件。 - Seb

1
一般来说,这似乎不是一个好主意。mark并不能保证流支持,并且即使它被支持,你也必须指定在调用reset之前可以读取多少字节。
既然你提到那些dostuff可以异步运行,为什么不为每个dostuff启动一个线程,并使用队列将输入从主线程并发地传递到这三个队列中呢?这需要一些同步,但这样你就没有输入量的限制,仍然可以限制内存使用。

1
如果你知道三个doStuff()函数是异步运行的,那么你可以尝试使用Apache Commons IO TeeInputStream将初始InputStream的内容复制到连接到doStuff2()正在读取的PipedInputStreamPipedOutputStream。同样,你可以设置第二个TeeInputStream,它使用第二个PipedOutputStream构造,并连接到第二个PipedInputStream,以用于doStuff3()。
这种方法有一些限制:
1)doStuff1()、doStuff2()和doStuff3()必须在单独的线程上运行,否则在doStuff1()运行并在doStuff2()和doStuff3()运行之前,你将缓冲整个文件两次。该方法假定doStuff2()和doStuff3()在doStuff1()最初读取数据时正在读取和处理数据。

2) doStuff1()不能使用skip()、mark()或reset(),因为这会破坏下游函数(如TeeInputStream javadoc中所述)。

只要所有三个doStuff()函数能够以大约相同的速度处理数据,这种方法应该是相当内存有效的。


这似乎实现了我想要的。但是,在你的第一点中,当你说该方法假设doStuff2和doStuff3正在读取和处理数据,而doStuff1最初正在读取数据时,你是什么意思?为什么会这样? - Seb
如果这三个函数不是异步运行的(即在单独的线程中),那么doStuff2()和doStuff3()将在doStuff1()完成后才会运行。如果它们没有在doStuff1运行时运行并从PipedInputStream读取数据,那么数据将在管道中累积(虽然它可能会被写入磁盘,但本质上位于内存中)。由于有两个管道,您将存储两倍的数据。您表示数据“非常大”,因此我假设您不希望在内存中有两份副本。 - Rob McDougall
这就是为什么在 doStuff1() 运行时,让 doStuff2()/doStuff3() 函数同时运行非常重要。这样,在 doStuff1() 向管道中添加数据的同时,这两个函数(doStuff2()/doStuff3())可以同时从管道中读取数据(并清除它)。这有帮助吗? - Rob McDougall
哦,我明白了,这很有道理。感谢你的回答。 - Seb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接