快速处理大文件并在每行上调用函数

4

我有一个大约有1000万行文本的文件(是的,我的内存足够)。 现在我想要一个MyClass列表(构造函数为MyClass(String s)),其中包含文件的每一行。目前我是这样做的:

List<MyClass> help = Files.lines(Paths.get(s))
                          .parallel()
                          .map(MyClass::new)
                          .collect(Collectors.toList());

但这需要几年的时间才能取得进展。有什么想法可以加速解决这个问题吗?

你可以添加关于你愿意做什么和想要避免什么的信息。 - Pshemo
那么你的操作流程是什么?看起来你只是在每一行创建某个类的一个新实例。更让人担忧的是,你应该.close()你的流:Files.lines()是受I/O限制的。 - fge
1
https://dev59.com/9pHea4cB1Zd3GeqPjwmG 的另一个版本? - PM 77-1
“是的,我有足够的内存吗?”按照今天的标准,一千万行的文本文件并不算很大。 - Solomon Slow
1
作为一个非常简单的优化,您可以尝试使用Arrays.asList(Files.lines(...)...toArray(MyClass[]::new))代替.collect(Collectors.toList())。然而,拥有MCVE会更好。 - Tagir Valeev
显示剩余2条评论
1个回答

2
首先,从Collectors.toList()的文档中提取相关内容如下:

[...] 返回的List的类型、可变性、可序列化性或线程安全性没有任何保证;如果需要更多对返回的List的控制,请使用toCollection(Supplier)

现在,让我们更深入地了解收集器的特征;我们找到了这个:

public static final Collector.Characteristics CONCURRENT

表明此收集器是并发的,这意味着结果容器可以支持从多个线程同时调用累加器函数与相同结果容器。

如果CONCURRENT收集器不是UNORDERED,则只有在应用于无序数据源时才能并发评估。

现在,没有什么保证Collectors.toList()返回的收集器是Concurrent的。
尽管启动您的新类可能需要一些时间,但在这里可以放心地假设此收集器不是并发的。但幸运的是,我们可以使用一个并发收集器,如Java文档中所提到的。因此,让我们尝试一下:
.collect(
        Collector.of(CopyOnWriteArrayList::new,
            List::add,
            (o, o2) -> { o.addAll(o2); return o; },
            Function.<List<String>>identity(),
            Collector.Characteristics.CONCURRENT,
            Collector.Characteristics.IDENTITY_FINISH
        )
    )

这可能会加快速度。
现在,你又有一个问题。你没有关闭流。
这是鲜为人知的,但是 Stream(无论是任何类型的流还是 {Int, Double, Long}Stream)都实现了 AutoCloseable 接口。你需要关闭那些 I/O 绑定的流,而 Files.lines() 就是这样一种流。
所以,试一下这个:
final List<MyClass> list;

try (
    final Stream<String> lines = Files.lines(...);
) {
    list = lines.parallel().map(MyClass::new)
        .collect(seeAbove);
}

使用此 Collector,Eclipse 给出错误:“类型不匹配:无法从 Object 转换为 List<Receiver>”。 - Exagon
2
嗯,这至少能加快你的速度吗? :p - fge
是的,它可以加快速度,但也需要大约10分钟...所以...嗯...这个任务怎么可能只使用我的0.7个CPU呢? - Exagon
那你觉得稍微提高一下呢?例如,你能否用 .forEach() 替换掉你的 .collect() 调用呢? - fge
1
不,我的意思是,有必要一开始就将所有新创建的对象收集到一个集合中吗?难道你不能使用它们来做你需要做的事情吗?例如,stream.parallel().map(MyClass:new).forEach( /* 做一些 MyClass 实例相关的操作 */ ) - fge
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接