Python 3.5并发Future最快的大文件处理方法

3

我希望您能够用concurrent futures学习多线程/多进程。我尝试使用了以下代码,虽然我知道磁盘IO问题不可避免,但我想最大限度地利用我的RAM和CPU。什么方法是处理大规模数据最常用/最佳的方法?如何使用concurrent futures来处理大型数据集?是否有比下面这些更优先的方法?

方法1:

for folders in os.path.isdir(path):
    p = multiprocessing.Process(pool.apply_async(process_largeFiles(folders)))
    jobs.append(p)
    p.start()

方法二:
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
    for folders in os.path.isdir(path):
        executor.submit(process_largeFiles(folders), 100)

方法三:
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for folders in os.path.isdir(path):
        executor.submit(process_largeFiles(folders), 10)

我应该尝试同时使用进程池和线程池吗?

方法(思路):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as process:
     with concurrent.futures.ThreadPoolExecutor(max_workers=100) as thread:
          for folders in os.path.isdir(path):
              process.submit(thread.submit(process_largeFiles(folders), 100),10)

如何在最广泛的使用情况下最大化我的RAM和CPU利用率?

我知道启动进程需要一些时间,但是如果处理的文件很大,这是否会被抵消?


你有任何可以用来比较的测试数据或函数吗?你自己做过任何比较测试吗?结果如何?你得出了什么结论?这是一个非常广泛的问题,有很多未知因素会影响比较结果。 - wwii
你的任何解决方案都能工作吗? - wwii
你有对代码进行性能分析以找出哪些部分比较慢吗? - MaxNoe
1个回答

1
使用TreadPoolExecutor打开和读取文件,然后使用ProcessPoolExecutor处理数据。
import concurrent.futures
from collections import deque

TPExecutor = concurrent.futures.ThreadPoolExecutor
PPExecutor = concurrent.futures.ProcessPoolExecutor
def get_file(path):
    with open(path) as f:
        data = f.read()
    return data

def process_large_file(s):
    return sum(ord(c) for c in s)

files = [filename1, filename2, filename3, filename4, filename5,
         filename6, filename7, filename8, filename9, filename0]

results = []
completed_futures = collections.deque()

def callback(future, completed=completed_futures):
    completed.append(future)

with TPExecutor(max_workers = 4) as thread_pool_executor:
    data_futures = [thread_pool_executor.submit(get_file, path) for path in files]
with PPExecutor() as process_pool_executor:
    for data_future in concurrent.futures.as_completed(data_futures):
        future = process_pool_executor.submit(process_large_file, data_future.result())
        future.add_done_callback(callback)
        # collect any that have finished
        while completed_futures:
            results.append(completed_futures.pop().result())

使用了一个完成回调函数,这样就不必等待已完成的未来任务。我不知道这对效率有什么影响——主要是为了简化as_completed循环中的逻辑/代码而使用它。

如果由于内存限制需要限制文件或数据的提交,则需要进行重构。根据文件读取时间和处理时间,很难说在任何给定时刻会有多少数据存在于内存中。我认为在as_completed中收集结果应该有助于减轻这种情况。data_futures可能在ProcessPoolExecutor设置完毕之前开始完成,因此需要优化这种顺序。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接