Python多进程池:在执行任务期间动态设置进程数量

6
我们在Python 2.7上提交大型CPU密集型作业(包含许多独立并行进程),这些作业持续数天。当这些作业使用大量进程运行时,机器的响应速度会显著降低。理想情况下,在开发代码时,我希望限制可用的CPU数量,并在晚上尽可能高效地运行尽可能多的进程。
Python multiprocessing库允许您在初始化池时指定进程数。是否有一种方法可以在每次启动新任务时动态更改此数字?
例如,在19-07小时内允许运行20个进程,在07-19小时内允许运行10个进程。
其中一种方法是使用占用大量CPU的活动进程数进行检查。以下是我希望它能够运行的方式:
from multiprocessing import Pool
import time 

pool = Pool(processes=20)

def big_task(x):
    while check_n_process(processes=10) is False:
        time.sleep(60*60)
    x += 1
    return x 


x = 1
multiple_results = [pool.apply_async(big_task, (x)) for i in range(1000)]
print([res.get() for res in multiple_results])

但我需要编写“check_n_process”函数。

还有其他解决这个问题的想法吗?

(代码需要在Python 2.7中运行 - 不可行的bash实现)。

2个回答

4
Python的multiprocessing.Pool没有提供一种改变正在运行的Pool工作进程数量的方式。一个简单的解决方案是依赖第三方工具。
billiard提供的池以前提供过这样的功能。
任务队列框架,如CeleryLuigi肯定允许灵活的工作负载,但更为复杂。
如果使用外部依赖项不可行,则可以尝试以下方法。从这个答案详细说明,您可以基于信号量设置限流机制。
from threading import Semaphore, Lock
from multiprocessing import Pool

def TaskManager(object):
    def __init__(self, pool_size):
        self.pool = Pool(processes=pool_size)
        self.workers = Semaphore(pool_size)
        # ensures the semaphore is not replaced while used
        self.workers_mutex = Lock()  

    def change_pool_size(self, new_size):
        """Set the Pool to a new size."""
        with self.workers_mutex:  
            self.workers = Semaphore(new_size)

    def new_task(self, task):
        """Start a new task, blocks if queue is full."""
        with self.workers_mutex:
            self.workers.acquire()

        self.pool.apply_async(big_task, args=[task], callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        with self.workers_mutex:
            self.workers.release()

如果有超过X个worker在忙,池子会阻止进一步尝试调度你的big_tasks。通过控制这个机制,你可以限制并发运行的进程数量。当然,这意味着你放弃了Pool队列机制。
task_manager = TaskManager(20)

while True:
    if seven_in_the_morning():
        task_manager.change_pool_size(10)
    if seven_in_the_evening():
        task_manager.change_pool_size(20)

    task = get_new_task()
    task_manager.new_task()  # blocks here if all workers are busy

这在内存方面是浪费的,因为所有工作进程都在需求之前启动。例如,Pool(1000)将启动1000个Python进程。 - milahu

0

这个问题非常残缺(而且是一个老问题),但您可以通过跟踪正在运行的进程并仅在有利时调用apply_async()来管理负载;如果每个作业都运行不到永远,您可以在工作时间少分派一些任务,或者当os.getloadavg()过高时降低负载。

我在运行多个“scp”以规避我们内部网络上的流量整形时也用此方法来管理网络负载(别告诉任何人!)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接