以并行的方式将 `BufferedReader` 转换为 `Stream<String>`

6
有没有一种方法可以从BufferedReader reader中接收一个Stream<String> stream,使得stream中的每个字符串都代表reader的一行,同时满足附加条件,即在读取完reader之前直接提供stream?我想要并行处理stream中的数据,以节省时间。
编辑:我想要并行读取和处理数据。我不想并行处理不同的行。它们应该按顺序处理。
让我们举个例子来说明我想要如何节省时间。假设我们的reader会向我们呈现100行。读取一行需要2毫秒,处理一行需要1毫秒。如果我先读取所有行,然后再处理它们,那么需要300毫秒。我想要做的是:一旦读取了一行,我就想要处理它,并并行读取下一行。总时间将为201毫秒。
我不喜欢BufferedReader.lines()的原因:据我所知,当我想处理这些行时,读取才开始。假设我已经有了我的reader,但必须在处理第一行之前进行预计算。让我们假设这需要30毫秒。在上面的例子中,总时间将是231毫秒或使用reader.lines()将是301毫秒(您能告诉我哪个时间是正确的吗?)。但是,可以在读取前15行时并行进行预计算,从而在201毫秒内完成任务。

1
Marko Topolnik写了一个Spliterator包装器,允许您变化批处理大小:https://dev59.com/rWEh5IYBdhLWcg3wTB72#22575506 - Stuart Marks
3个回答

9
你可以使用reader.lines().parallel()。这样,您的输入将被分成块,并且进一步的流操作将并行执行在这些块上。如果进一步的操作需要很长时间,则可能会提高性能。
在你的情况下,默认启发式方法将不起作用,我想你没有准备好的解决方案可以让你使用单行批处理。您可以编写一个自定义的spliterator,在每个行之后进行拆分。查看java.util.Spliterators.AbstractSpliterator实现。可能最简单的解决方案是编写类似的内容,但在trySplit方法中将批处理大小限制为一个元素,并在tryAdvance方法中读取单行。

2

为了实现你想要的功能,通常需要一个线程读取行并将它们添加到阻塞队列中,再由第二个线程从这个阻塞队列中获取行并进行处理。


我希望使用流的概念,我就不必再写那些线程相关的东西了。 - principal-ideal-domain

2
你正在看错地方。你认为一行行的代码会从文件中读取行,但它并不是这样工作的。在读取之前,没有人知道一行是什么,因此你无法告诉底层系统读取一行。
BufferedReader之所以被称为缓冲读取器,是因为它有一个字符缓冲区。该缓冲区默认容量为8192。每当需要新的一行时,缓冲区将被解析以查找换行符并返回该部分。当缓冲区不足以找到当前行时,整个缓冲区将被填充。
现在,填充缓冲区可能会导致请求从底层InputStream中读取字节以填充字符解码器的缓冲区。请求多少字节和实际读取多少字节取决于字符解码器的缓冲区大小,实际编码映射到一个字符需要多少字节以及底层InputStream是否有自己的缓冲区以及其大小。
实际昂贵的操作是从底层流中读取字节,而从读取请求到这些读取操作之间没有简单的映射关系。请求第一行可能会导致从底层文件中读取一个16 KiB的块,而后续一百个请求可能都来自已经填满的缓冲区,并且根本不会引起任何I/O操作。而且,使用Stream API无法改变这种情况。你唯一可以并行化的是在缓冲区中搜索新行字符,但这太简单了,无法从并行执行中受益。
你可以减少所有相关方的缓冲区大小,以实现在处理前一行时并行读取一行的目的,但是这种并行执行永远无法弥补由小缓冲区大小引起的性能下降。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接