如何在Java中实现多线程来处理200万个文本文件?

4
我需要处理大约2百万个文本文件并生成三元组。
假设我有一个文本文件xyz.txt(这是200万个输入文件之一),它被处理如下:
start(xyz.txt)---->module1(xyz.tpd)------>module2(xyz.adv)-------->module3(xyz.tpl)
请给我建议一个逻辑或概念,使我可以在x64 4GB Windows系统上更快地处理和优化。
module1(正在工作):它使用.bat文件解析txt文件,在其中调用解析器,它是一个独立的系统线程,并在15秒后再次开始解析另一个txt文件,以此类推....
module2(正在工作):它接受.tpd文件作为输入并生成.adv文件。
module3(正在工作):它接受.adv文件作为输入并生成.tpl(三元组)。
我应该从txt文件开始线程还是在其他某个点开始?
我担心如果CPU在上下文切换时被卡住。
有人能提供更好的逻辑,以便我可以尝试吗?

3
这三个步骤是否需要大量的CPU资源,或者你的大部分时间都用于从磁盘中读取/写入数据? - assylias
大多数操作是读/写操作,但解析和三元组生成需要CPU强劲。 - Roshan
如果你使用硬盘,那么你的200万个文件需要大约11小时才能访问(但是你必须打开和关闭如此多的文件)。如果你每15秒处理一个文件,这将需要347天的时间来完成。 - Peter Lawrey
哇!你提出了一些惊人的事实... 所以我现在考虑将工作分配给4台机器... - Roshan
5个回答

4

使用ThreadPoolExecutor。调整它的参数,如活动线程数等,以适应您的环境和系统。


1
如果您有几个小时的时间阅读,这可能会有所帮助:http://hadoop.apache.org/ - Abraham
1
@Abraham,Hadoop是很好的,但你需要额外的开销...在OP的情况下,这是值得怀疑的。 - fge
@RoshanJha 看看我的回答;使用JDK内置的Executors类创建这样的执行器非常容易。实际上,你最困难的任务将是为处理一个文件创建一个Runnable... JDK有所有必要的类来为你完成这项工作(线程安全等)。 - fge

4
最重要的是,你需要编写程序并进行性能分析,查看瓶颈在哪里。很可能磁盘I/O操作是瓶颈,不管多少线程都无法解决问题。
在这种情况下,使用两个(三个?四个?)单独的硬盘可能比最佳多线程解决方案获得更多的速度提升。
此外,一般规则是,只有当你有工作代码并真正知道需要优化时才应该优化你的应用程序。进行性能分析非常重要。
在编写时考虑未来的多线程优化是可以的;架构应足够灵活,以允许未来的优化。

也许你是对的,我只是在等待一个好的开端。 - Roshan
@RoshanJha 我的观点是,先编写一个简单版本的应用程序,然后测试它,对其进行分析,最后再考虑优化。过早地进行优化是编程中所有罪恶的根源(或至少是大部分) -- Donald Knuth - Dariusz

1
这里没有提及你的硬件环境,但基本解决方案是使用固定大小的 ExecutorService,初始大小应为你的执行单元数。
private static final int NR_CPUS = Runtime.getRuntime().availableProcessors();

// Then:

final ExecutorService executor = Executors.newFixedThreadPool(NR_CPUS);

然后,对于每个文件,您可以创建一个Runnable来处理它,并使用其.execute()方法将其提交到线程池中。
请注意,.execute()是异步的;如果提交的可运行对象当前无法运行,它将被放入队列中。

那么,你应该吗?这就是问题所在。目前,JVM本身可用的唯一可靠指标是执行单元的数量。很可能这样的池可能太大或太小,但无论如何,这是一个很好的起点。在运行时适当地调整池的大小是一件非常困难的事情...即使是运行时条件也可能会发生变化。但这样的池是一个安全的赌注,工作最终会完成... - fge

0

听起来像是需要进行数据集成的典型批处理应用程序。虽然我不打算在完全了解您的需求之前就抛出超链接,但您可能需要一种解决方案,该解决方案应在单个VM中工作,并且随着时间的推移,您希望将解决方案扩展到多个VM/机器..而且也许我们开始并没有处理PB级别的数据..尝试使用Spring Batch,它不仅可以在给定上下文中解决问题,还可以让您学会构建思路(思考词汇!)以解决类似的问题。


0
作为起点,我会创建一个IO线程和一组CPU线程。IO线程读取文本文件并将其提供给BlockingQueue,而CPU线程从BlockingQueue中获取文件并处理它们。然后对应用程序进行分析,以查看您应该使用多少个CPU线程来跟上IO线程的步伐(您也可以动态确定这一点,例如从一个CPU线程开始,当BlockingQueue的大小超过阈值时启动另一个线程,可能是20个文件左右)。有可能您会发现只需要一个CPU线程就能跟上IO线程的步伐,这种情况下您的程序是IO限制的,您需要将文本文件放在磁盘上相邻的位置(这样您就可以在除第一个文件外的所有文件上使用顺序读取),或者将它们放在不同的磁盘上以加快应用程序的速度;一个想法是将文件压缩在一起,并使用ZipInputStream读取它们-这将减少读取文件时的磁盘寻道次数,并减少需要读取的数据量。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接