Python - 多进程处理 - 更多的任务数量超过了CPU数量。

4
我知道可以使用Pool类来完成任务,但我想更细致地控制问题。我的任务数量多于处理器数量,因此我不希望它们同时运行。
例如:
from multiprocessing import Process,cpu_count
for dir_name in directories:
    src_dir = os.path.join(top_level,dir_name)
    dst_dir = src_dir.replace(args.src_dir,args.target_dir)
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
    p.start()

然而,如果我有超过16个目录,那么我将启动比我的处理器更多的作业。这是我的解决方案,真的很巧妙。

from multiprocessing import Process,cpu_count
jobs = []
for dir_name in directories:
    src_dir = os.path.join(top_level,dir_name)
    dst_dir = src_dir.replace(args.src_dir,args.target_dir)
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
    jobs.append(p)

alive_jobs = []
while jobs:
    if len(alive_jobs) >= cpu_count():
        time.sleep(5)
        print alive_jobs
        for aj in alive_jobs:
            if aj.is_alive():
                continue
            else:
                print "job {} removed".format(aj)
                alive_jobs.remove(aj)

        continue

    for job in jobs:
        if job.is_alive():
            continue
        job.start()
        alive_jobs.append(job)
        print alive_jobs
        jobs.remove(job)
        if len(alive_jobs) >= cpu_count():
            break

有没有使用内置工具更好的解决方案?

这给你什么更精细的控制? - Peter Wood
如果有人有解决方案,可以在不冻结并关闭终端的情况下键盘中断池模块。 - jwillis0720
同时:https://dev59.com/om3Xa4cB1Zd3GeqPisup - Brendan Long
1个回答

5
我想在这里分享我的想法:创建与CPU数量相等的进程,使用一个队列来存储所有目录,并将队列传递给你的transfer_directory方法。一旦一个进程完成工作,就从队列中取出一个dir_name。草稿如下:
NUM_OF_PROCESSES = multiprocessing.cpu_count()
TIME_OUT_IN_SECONDS = 60

for dir_name in directories:
    my_queue.put(dir_name)

# creates processes that equals to number of CPU 
processes = [multiprocessing.Process(target=transfer_directory, args=(my_queue,)) for x in range(NUM_OF_PROCESSES)]

# starts processes
for p in processes:
    p.start()

# blocks the calling thread
for p in processes:
    p.join()



def transfer_directory(my_queue):
    """processes element of directory queue if queue is not empty"""
    while my_queue is not empty:
        dir_name = my_queue.get(timeout=TIME_OUT_IN_SECONDS)
        src_dir = os.path.join(top_level,dir_name)
        dst_dir = src_dir.replace(args.src_dir,args.target_dir)

编辑: 它还可以高效地读取大文件。 我曾经苦恼于如何使用multiprocessing来读取一个非常大的文件(超过1000万行),最终我使用了单进程启动producer(a_queue)仅仅是将行读入队列并放置,然后启动多个consumers(a_queue)a_queue中取出行并执行耗时工作,对我来说这种方法很有效。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接