限制Python多进程的总CPU使用率

29

我正在使用multiprocessing.Pool.imap在Windows 7上用Python 2.7并行运行许多独立的作业。使用默认设置,根据Windows任务管理器的显示,我的总CPU使用率达到了100%。这使得在我的代码在后台运行时无法进行任何其他工作。

我已经尝试将进程数量限制为CPU数量减1,如How to limit the number of processors that Python uses所述:

pool = Pool(processes=max(multiprocessing.cpu_count()-1, 1)
for p in pool.imap(func, iterable):
     ...

这确实减少了运行进程的总数。然而,每个进程只是需要更多的周期来弥补它。所以我的总CPU使用率仍然被固定在100%。

有没有直接限制总CPU使用率的方法 - 不仅仅是进程数 - 或者如果没有,请问是否有任何解决方法?


1
移除一个核心是不够的。由于GIL的存在,Python只会消耗1个核心的处理时间,除非你在模块(例如numpy)或子进程中执行了一些线程操作。但主进程可能并不完全空闲,而Windows本身也是一个占用资源的程序。根据你的代码和系统上其他正在运行的程序,2/3的CPU可能是最大值。 - tdelaney
1
您可以使用win32工具(例如:http://code.activestate.com/recipes/496767-set-process-priority-in-windows/)或者使用`psutil`的`Process.nice`来降低进程优先级。这样虽然仍然可以获得100%的使用率,但是会减少干扰。您仍然希望运行少于所有核心的进程。 - tdelaney
@tdelaney 谢谢,这些提示很有帮助!如果您觉得可以发布一个结合了pool和psutil的代码示例,那肯定是对问题的很好回答! - Dave Kielpinski
3个回答

54
The solution depends on what you want to do. Here are a few options:

降低进程优先级

您可以nice 子进程。这样,虽然它们仍会占用100%的CPU,但当您启动其他应用程序时,操作系统会优先考虑其他应用程序。如果您想让一个工作密集型计算在笔记本电脑的后台运行,并且不关心CPU风扇一直运转,那么使用psutils设置nice值就是您的解决方案。此脚本是一个测试脚本,会在所有核心上运行足够长的时间,以便您了解其行为。

from multiprocessing import Pool, cpu_count
import math
import psutil
import os

def f(i):
    return math.sqrt(i)

def limit_cpu():
    "is called at every process start"
    p = psutil.Process(os.getpid())
    # set to lowest priority, this is windows only, on Unix use ps.nice(19)
    p.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)

if __name__ == '__main__':
    # start "number of cores" processes
    pool = Pool(None, limit_cpu)
    for p in pool.imap(f, range(10**8)):
        pass

这个技巧在于 limit_cpu 在每个进程开始时运行(参见文档中的initializer参数)。而 Unix 有从 -19(最高优先级)到 19(最低优先级)的级别,Windows 则有几个不同的级别来给予优先级。 BELOW_NORMAL_PRIORITY_CLASS 可能最符合您的要求,还有一个 IDLE_PRIORITY_CLASS,它告诉 Windows 只在系统空闲时运行您的进程。
您可以在任务管理器中切换到详细模式并右键单击进程来查看优先级:

enter image description here

进程数减少

虽然您已经拒绝了这个选项,但它仍然可能是一个不错的选择:假设您使用 pool = Pool(max(cpu_count()//2, 1)) 将子进程数量限制为CPU核心数的一半,则操作系统最初会在一半的CPU核心上运行这些进程,而其他核心则保持空闲状态或只运行当前正在运行的其他应用程序。过了一段时间,操作系统会重新安排这些进程,并可能将它们移动到其他CPU核心等。无论是 Windows 还是基于 Unix 的系统都可以这样处理。

Windows:在4个核心上运行2个进程:

OSX:在8个核心上运行4个进程

enter image description here

你会发现两个操作系统都在核心之间平衡进程,虽然不是完全均匀的,所以你仍然会看到一些核心的百分比比其他核心高。

睡眠

如果你绝对想确保你的进程从不占用某个核心的100%(例如,如果你想防止CPU风扇加速),那么你可以在处理函数中运行sleep:

from time import sleep

def f(i):
    sleep(0.01)
    return math.sqrt(i)

这将使操作系统每次计算后调度你的进程0.01秒,并为其他应用程序腾出空间。如果没有其他应用程序,则CPU内核处于空闲状态,因此它永远不会达到100%。您需要尝试不同的睡眠时间,它也会因运行计算机而异。如果你想变得非常复杂,你可以根据cpu_times()的报告调整睡眠时间。

这非常有帮助!我没有接受它作为答案的唯一原因是我的问题是针对Windows的,而你主要解决Linux和OSX。 - Dave Kielpinski
3
@DaveKielpinski 我还没有时间深入研究Windows的实现,但看起来 p.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) 是你想要的。 - tdelaney
1
@DaveKielpinski 快速回答:psutil 承诺在 Windows 上的工作方式与 Linux 相同,如果我有时间,我也会在 Windows 上进行检查,但我预计只会有一些细微的差别。 - hansaplast
@DaveKielpinski,感谢您的输入。实际上,在Windows上p.nice(19)无法工作,因为Windows使用这些不同的级别。 - hansaplast
@hansapalast,感谢您提供如此丰富的答案! - Dave Kielpinski

6

关于操作系统

你可以使用nice命令为单个命令设置优先级。你也可以使用nice启动一个Python脚本。(以下来自: http://blog.scoutapp.com/articles/2014/11/04/restricting-process-cpu-usage-using-nice-cpulimit-and-cgroups)

nice

nice命令可以调整进程的优先级,使其运行频率降低。这在需要将CPU密集型任务作为后台或批处理作业运行时非常有用。niceness级别范围从-20(最有利的调度)到19(最不利的调度)。Linux上的进程默认以niceness 0启动。nice命令(没有任何其他参数)将以niceness 10启动一个进程。在这个级别下,调度程序将把它视为较低优先级的任务,并给予它更少的CPU资源。启动两个matho-primes任务,一个带有nice,一个没有:

nice matho-primes 0 9999999999 > /dev/null &matho-primes 0 9999999999 > /dev/null &
matho-primes 0 9999999999 > /dev/null &

现在运行 top。

enter image description here

作为Python中的一个函数

另一种方法是使用psutils检查过去一分钟的CPU平均负载,然后让您的线程检查CPU平均负载,如果低于指定的CPU负载目标,则启动另一个线程,并在超过CPU负载目标时休眠或终止该线程。这将在您使用计算机时不会妨碍您,但将保持恒定的CPU负载。

# Import Python modules
import time
import os
import multiprocessing
import psutil
import math
from random import randint

# Main task function
def main_process(item_queue, args_array):
    # Go through each link in the array passed in.
    while not item_queue.empty():
        # Get the next item in the queue
        item = item_queue.get()
        # Create a random number to simulate threads that
        # are not all going to be the same
        randomizer = randint(100, 100000)
        for i in range(randomizer):
            algo_seed = math.sqrt(math.sqrt(i * randomizer) % randomizer)
        # Check if the thread should continue based on current load balance
        if spool_down_load_balance():
            print "Process " + str(os.getpid()) + " saying goodnight..."
            break

# This function will build a queue and
def start_thread_process(queue_pile, args_array):
    # Create a Queue to hold link pile and share between threads
    item_queue = multiprocessing.Queue()
    # Put all the initial items into the queue
    for item in queue_pile:
        item_queue.put(item)
    # Append the load balancer thread to the loop
    load_balance_process = multiprocessing.Process(target=spool_up_load_balance, args=(item_queue, args_array))
    # Loop through and start all processes
    load_balance_process.start()
    # This .join() function prevents the script from progressing further.
    load_balance_process.join()

# Spool down the thread balance when load is too high
def spool_down_load_balance():
    # Get the count of CPU cores
    core_count = psutil.cpu_count()
    # Calulate the short term load average of past minute
    one_minute_load_average = os.getloadavg()[0] / core_count
    # If load balance above the max return True to kill the process
    if one_minute_load_average > args_array['cpu_target']:
        print "-Unacceptable load balance detected. Killing process " + str(os.getpid()) + "..."
        return True

# Load balancer thread function
def spool_up_load_balance(item_queue, args_array):

    print "[Starting load balancer...]"
    # Get the count of CPU cores
    core_count = psutil.cpu_count()
    # While there is still links in queue
    while not item_queue.empty():
        print "[Calculating load balance...]"
        # Check the 1 minute average CPU load balance
        # returns 1,5,15 minute load averages
        one_minute_load_average = os.getloadavg()[0] / core_count
        # If the load average much less than target, start a group of new threads
        if one_minute_load_average < args_array['cpu_target'] / 2:
            # Print message and log that load balancer is starting another thread
            print "Starting another thread group due to low CPU load balance of: " + str(one_minute_load_average * 100) + "%"
            time.sleep(5)
            # Start another group of threads
            for i in range(3):
                start_new_thread = multiprocessing.Process(target=main_process,args=(item_queue, args_array))
                start_new_thread.start()
            # Allow the added threads to have an impact on the CPU balance
            # before checking the one minute average again
            time.sleep(20)

        # If load average less than target start single thread
        elif one_minute_load_average < args_array['cpu_target']:
            # Print message and log that load balancer is starting another thread
            print "Starting another single thread due to low CPU load balance of: " + str(one_minute_load_average * 100) + "%"
            # Start another thread
            start_new_thread = multiprocessing.Process(target=main_process,args=(item_queue, args_array))
            start_new_thread.start()
            # Allow the added threads to have an impact on the CPU balance
            # before checking the one minute average again
            time.sleep(20)

        else:
            # Print CPU load balance
            print "Reporting stable CPU load balance: " + str(one_minute_load_average * 100) + "%"
            # Sleep for another minute while
            time.sleep(20)

if __name__=="__main__":

    # Set the queue size
    queue_size = 10000
    # Define an arguments array to pass around all the values
    args_array = {
        # Set some initial CPU load values as a CPU usage goal
        "cpu_target" : 0.60,
        # When CPU load is significantly low, start this number
        # of threads
        "thread_group_size" : 3
    }

    # Create an array of fixed length to act as queue
    queue_pile = list(range(queue_size))
    # Set main process start time
    start_time = time.time()
    # Start the main process
    start_thread_process(queue_pile, args_array)
    print '[Finished processing the entire queue! Time consuming:{0} Time Finished: {1}]'.format(time.time() - start_time, time.strftime("%c"))

-1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接