Java:InputStream上的并发读取

7
我看了一会儿,对这个问题有些疑惑。我希望能够同时读取输入流并分段读取它。这些段不相互影响,它们只是要从上传的文件中插入或更新到数据库的值。设置一个段大小并在转换和插入/更新之前跳过,然后再启动新线程处理,这样可以同时读取输入流吗?
本质上,该文件是一个ID列表(每行一个ID),虽然最好可以指定分隔符。有些文件可能非常大,因此我希望将数据处理和转换为片段,以便在插入/更新到数据库后可以释放JVM内存。是否可能做到这一点?如果可以,是否已经存在任何库来完成此操作?
谢谢您提前的帮助,
Alexei Blue.
3个回答

9
一个好的方法可能是使用一个单一的读取器来读取块,然后将每个块从线程池中的工作线程传递出去。考虑到这些将被插入到数据库中,与读取输入相比,插入操作将远远慢得多,因此单个线程应该足够进行读取。
以下是一个示例,它将System.in的每一行处理交给一个工作线程。如果您在单个事务中执行大量插入操作,则数据库插入的性能要好得多,因此传递一组包含1000行的数据比传递单个行更好。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static class Worker implements Runnable {
        private final String line;

        public Worker(String line) {
            this.line = line;
        }

        @Override
        public void run() {
            // Process line here.
            System.out.println("Processing line: " + line);
        }
    }

    public static void main(String[] args) throws IOException {
        // Create worker thread pool.
        ExecutorService service = Executors.newFixedThreadPool(4);

        BufferedReader buffer = new BufferedReader(new InputStreamReader(System.in));
        String line;

        // Read each line and hand it off to a worker thread for processing.
        while ((line = buffer.readLine()) != null) {
            service.execute(new Worker(line));
        }
    }
}

嗨Ed,感谢您的示例^.^,所以如果我将1000行读入StringBuffer,然后将其传递给工作线程进行处理并插入/更新数据库,您认为这是否是一个好方法? :) - Alexei Blue
最好将这1000行读入到一个List<String>String[]中。如果你将它们读入到一个StringBuffer中,那么它将成为一个单独的字符串,你需要再次解析出每一行。 - Ed Plese

2
首先,要从不同的偏移量并发地读取文件,您需要对文件进行随机访问,这意味着可以从任何位置读取文件。Java 可以使用 java.in 中的 RandomAccessFile 或 java.nio 中的 SeekableByteChannel 来实现此功能:

在 Java 中写入文件中间最佳方法

http://docs.oracle.com/javase/tutorial/essential/io/rafs.html

我认为出于速度的原因,您会更喜欢使用java.nio。

Java NIO FileChannel versus FileOutputstream性能/有用性

现在您知道如何从任何位置读取,但您需要同时进行此操作。由于它们在文件中保持位置,因此不能使用同一文件访问对象。因此,您需要与线程一样多的文件访问对象。既然您正在读取而不是写入,那应该没问题。

现在您知道如何从许多不同的偏移量并发地读取同一文件。

但请考虑性能。尽管您有许多线程,但只有一个磁盘驱动器,并且随机读取(许多线程访问同一文件)的性能比顺序读取(一个线程读取一个文件)要慢得多。即使它是raid 0或1-也无所谓。顺序读取始终快得多。因此,在您的情况下,我建议您在一个线程中读取文件,并为其他线程提供来自该读取线程的数据。


1

我认为你不能同时读取一个InputStream。这就是为什么合同定义了read,reset和mark的原因-想法是流在内部跟踪已读和未读的内容。

如果你正在读取文件,请打开多个流。您可以使用{{link1:skip()}}方法将标记器向前移动以供其他线程避免重复行处理。 {{link2:BufferedReader}}也可能会有所帮助,因为它提供逐行阅读。


是的,目前我正在使用缓冲读取器+跳过的方式,需要更多的工作,但我相信使用单个顺序读取并将工作移动到其他线程将是一个很好的改进。感谢提供这些链接。 - Alexei Blue

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接