os.sched_getaffinity(0)与os.cpu_count()有何不同?

10

我知道标题中这两种方法的区别,但不知道它们的实际影响。

据我所知:如果您使用的NUM_WORKERS多于实际可用的核心数,由于操作系统不断地尝试保持并行性,您将面临严重的性能下降。我不知道这是否属实,但我在某个比我聪明的人的SO上读到了这一点。

os.cpu_count()的文档中,它说:

返回系统中的CPU数量。如果无法确定,则返回None。该数字与当前进程可以使用的CPU数量不等同。可用CPU的数量可以通过len(os.sched_getaffinity(0))获得

那么,如果进程可用的CPU数量超过“系统”中的CPU数量,我想知道“系统”是什么意思。

我只想安全高效地实现multiprocessing.pool功能。因此,这里是我的问题摘要:

这两种方法的实际影响是什么:

NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1
< p > -1 是因为我发现,如果在数据处理过程中尝试工作,我的系统会变得不那么卡顿。

3个回答

8
这两个函数非常不同,NUM_WORKERS = os.sched_getaffinity(0) - 1将无法立即成功,因为您尝试从一个集合中减去一个整数会导致TypeError。而os.cpu_count()告诉您系统有多少个内核,os.sched_getaffinity(pid)告诉您哪些线程/进程可以在哪些内核上运行。

os.cpu_count()

os.cpu_count()显示操作系统中可用内核的数量(虚拟内核)。很可能你只有一半的物理内核。是否有意义使用比你实际拥有的物理内核更多的进程,甚至是超过虚拟内核数量,这通常取决于你所做的工作。计算循环越紧密(指令少,缓存未命中少等),使用更多的核心(通过使用更多的worker-process)甚至会出现性能下降。

显然,这也取决于您的系统正在运行什么其他程序,因为您的系统会尝试为系统中的每个线程(作为进程的实际执行单元)提供公平份额的运行时间。因此,无法就使用多少worker进行概括。但是,例如,如果您有一个紧密的循环并且您的系统处于空闲状态,则优化的良好起点为

os.cpu_count() // 2 # same as mp.cpu_count() // 2 

...然后逐步增加。

正如@Frank Yellin所提到的,默认情况下multiprocessing.Pool使用os.cpu_count()作为工作进程数量。

os.sched_getaffinity(pid)

os.sched_getaffinity(pid)

返回PID为pid的进程(如果为零,则返回当前进程)限制的CPU集合。

现在,核心/CPU/处理器亲和性是关于您的线程(在您的工作进程内部)被允许运行的具体(虚拟)核心。您的操作系统为每个核心分配一个ID,从0到(核心数-1),更改亲和性允许限制(“固定”)某个线程在哪个实际核心上可以运行。

至少在Linux上,我发现这意味着如果没有可用的允许核心,子进程的线程将不会运行,即使其他未被允许的核心为空闲状态。因此,“亲和性”在这里有点误导。

调整亲和性时的目标是最小化因上下文切换和核心迁移而导致的缓存失效。您的操作系统通常具有更好的洞察力,并已经尝试使用其调度策略保持缓存“热”,因此除非您知道自己在做什么,否则不能轻易地干预。

默认情况下,亲和性设置为所有核心,对于multiprocessing.Pool而言,如果您的系统除此之外处于空闲状态,更改这一点并没有太大意义。

请注意,尽管这里的文档中提到了“进程”,但实际上设置亲和性是针对每个线程的。因此,例如,在“子”线程中为“零号进程”设置亲和性不会更改主线程或进程中的其他线程的亲和性。但是,子线程从主线程继承其亲和性,子进程(通过其主线程)从父进程的主线程继承亲和性。这会影响所有可能的启动方法(“spawn”,“fork”,“forkserver”)。下面的示例演示了这一点以及如何使用multiprocessing.Pool修改亲和性。

import multiprocessing as mp
import threading
import os


def _location():
    return f"{mp.current_process().name} {threading.current_thread().name}"


def thread_foo():
    print(f"{_location()}, affinity before change: {os.sched_getaffinity(0)}")
    os.sched_setaffinity(0, {4})
    print(f"{_location()}, affinity after change: {os.sched_getaffinity(0)}")


def foo(_, iterations=200e6):

    print(f"{_location()}, affinity before thread_foo:"
          f" {os.sched_getaffinity(0)}")

    for _ in range(int(iterations)):  # some dummy computation
        pass

    t = threading.Thread(target=thread_foo)
    t.start()
    t.join()

    print(f"{_location()}, affinity before exit is unchanged: "
          f"{os.sched_getaffinity(0)}")

    return _


if __name__ == '__main__':

    mp.set_start_method("spawn")  # alternatives on Unix: "fork", "forkserver"

    # for current process, exclude cores 0,1 from affinity-mask
    print(f"parent affinity before change: {os.sched_getaffinity(0)}")
    excluded_cores = {0, 1}
    os.sched_setaffinity(0, os.sched_getaffinity(0).difference(excluded_cores))
    print(f"parent affinity after change: {os.sched_getaffinity(0)}")

    with mp.Pool(2) as pool:
        pool.map(foo, range(5))

输出:

parent affinity before change: {0, 1, 2, 3, 4, 5, 6, 7}
parent affinity after change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}

5
如果你有一些任务,纯粹是100%的CPU绑定,即仅进行计算,那么显然如果进程池大小大于计算机上可用的CPU数量,就无法获得任何优势。但是,如果混合了I/O操作,进程将放弃CPU等待I/O完成(或者例如从网站返回相对较长时间的URL),会怎样呢?在这种情况下,使用超过os.cpu_count()的进程池大小,可能会实现改进的吞吐量,这一点对我来说还不清楚。

更新

以下代码可以证明这一点。该代码最好使用线程,但是却使用了进程。我的台式电脑有8个内核。该程序只是并发地(或者在这种情况下是并行地)获取54个URL。程序接收一个参数,即要使用的进程池大小。不幸的是,创建额外进程的初始开销很大,因此如果创建太多进程,则节省效果会递减。但是,如果任务运行时间长且I/O操作频繁,则创建进程的开销最终是值得的:

from concurrent.futures import ProcessPoolExecutor, as_completed
import requests
from timing import time_it

def get_url(url):
    resp = requests.get(url, headers={'user-agent': 'my-app/0.0.1'})
    return resp.text


@time_it
def main(poolsize):
    urls = [
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
    ]
    with ProcessPoolExecutor(poolsize) as executor:
        futures = {executor.submit(get_url, url): url for url in urls}
        for future in as_completed(futures):
            text = future.result()
            url = futures[future]
            print(url, text[0:80])
            print('-' * 100)

if __name__ == '__main__':
    import sys
    main(int(sys.argv[1]))

8个进程:(我的核心数):

func: main args: [(8,), {}] took: 2.316840410232544 sec.

16个进程:

func: main args: [(16,), {}] took: 1.7964842319488525 sec.

24个进程:

func: main args: [(24,), {}] took: 2.2560818195343018 sec.

顺便说一句,我在这个答案这里中有代码,可以证明你的观点。 - Darkonaut
这种性能提升是由于“虚拟”核心吗? - rocksNwaves
@rocksNwaves 我有4个真实核心 + 4个虚拟核心 = 8 ( == os.cpu_count()). 性能提升是因为被创建的进程在等待 URL 返回时会放弃它们所拥有的核心 (无论是真实还是虚拟的),如果有其他进程在等待核心运行,它现在就能得到机会。 - Booboo
1
好的,所以可以创建一个进程,但不能分配核心。本质上,你所说的是我可以启动尽可能多的进程,这在大量 I/O 或可能需要一些等待时间的操作中可能是有意义的。在等待期间,进程可以放弃核心并允许其他人工作... 所以我唯一的问题是:多进程池是否实际处理这种“我什么也不做,所以我让我的邻居来轮流”这种思维? - rocksNwaves
2
@rocksNwaves 我相当确定,当CPU由于另一个进程进入等待状态而变得可用时,底层操作系统(如Linux或Windows)现在负责调度进程。因此,它是在比Python的Process类更低的级别上完成的。但请记住,与相对轻量级的线程不同,创建进程会变得昂贵,您无法有效地使用它们(请参见我的示例)。这可能是创建Python池时默认值为实际CPU数量的原因(合理)。 - Booboo

3
实现多进程池使用
if processes is None:
    processes = os.cpu_count() or 1

不确定是否回答了你的问题,但至少这是一个数据点。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接