在Docker中运行Python的多进程程序

32

我正在尝试在 Docker 容器中测试 Python 的 multiprocessing,但即使成功创建进程(我有 8 个 CPU 并且创建了 8 个进程),它们始终只占用一个物理 CPU。以下是我的代码:

from sklearn.externals.joblib.parallel import Parallel, delayed
import multiprocessing
import pandas
import numpy
from scipy.stats import linregress
import random
import logging

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pandas.concat(retLst)

def compute_regression(df):
    result = {}

    (slope,intercept,rvalue,pvalue,stderr) = linregress(df.date,df.value)
    result["slope"] = [slope]
    result["intercept"] = [intercept]

    return pandas.DataFrame(result)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logging.info("start")
    random_list = []
    for i in range(1,10000):
        for j in range(1,100):
            random_list.append({"id":i,"date":j,"value":random.random()})

    df = pandas.DataFrame(random_list)

    df = applyParallel(df.groupby('id'), compute_regression)

    logging.info("end")

当我使用--cpus或--cpuset这样的选项启动时,我尝试了多个docker选项,但它始终只使用1个物理CPU。

这是Docker、Python还是操作系统的问题?Docker版本为1.13.1。

cpu_count()的结果:

>>> import multiprocessing
>>> multiprocessing.cpu_count()
8

运行时,这是一个顶部视图。我们可以看到主进程和8个子进程,但我发现百分比很奇怪。 top截图

然后,如果我更改为4个进程,则使用的CPU总量始终相同: 带有4个线程的top


如果您在 Mac 或 Windows 上运行 Docker,则它运行在虚拟机中。您需要将 Docker 配置为整体,以分配更多的 CPU 给该虚拟机。docker run 的选项不会覆盖它,您只能使用虚拟机允许使用的数量。 - Graham Dumpleton
它实际上是在 Linux 内运行的 :( - hanego
2
你能执行 print(multiprocessing.cpu_count()) 并将结果添加到你的问题中吗? - hansaplast
@hansaplast 我添加了截图。 - hanego
抱歉,我不再拥有它了,我大约3年前打开了这个问题 :/ - hanego
显示剩余6条评论
5个回答

19

https://docs.docker.com/get-started - "基本上,容器只是一个正在运行的进程,加上一些封装功能,以使其与主机和其他容器隔离开来。"

Docker运行在主机上。该主机(或虚拟机)有一定数量的物理(或虚拟)CPU。在您的情况下,multiprocessing.cpu_count() 显示8的原因是这是您系统的CPU数量。使用docker选项,如 --cpus--cpuset-cpus 不会改变您机器的硬件,这就是 cpu_count() 报告的内容。

在我的当前系统上:

# native
$ python -c 'import multiprocessing as mp; print(mp.cpu_count())'
12
# docker
$ docker run -it --rm --cpus 1 --cpuset-cpus 0 python python -c 'import multiprocessing as mp; print(mp.cpu_count())'
12

https://docs.docker.com/config/containers/resource_constraints/#cpu - “默认情况下,每个容器对主机的CPU周期的访问是无限的。” 但是您可以使用--cpus--cpuset-cpus等选项限制容器。 --cpus可以是浮点数,最多达到可用物理CPU数量。您可以将此数字视为分数中的分子<--cpus arg>/<physical CPU's>。如果您有8个物理CPU,并指定--cpus 4,则告诉docker不要使用超过总CPU数量的50%(4/8)。--cpus 1.5将使用18.75%(1.5/8)。 --cpuset-cpus实际上限制了要使用哪些特定的物理/虚拟CPU。
(还有许多其他与CPU相关的选项在docker文档中有介绍。)
这是一个较小的代码示例:
import logging
import multiprocessing
import sys

import psutil
from joblib.parallel import Parallel, delayed

def get_logger():
    logger = logging.getLogger()
    if not logger.hasHandlers():
        handler = logging.StreamHandler(sys.stdout)
        formatter = logging.Formatter("[%(process)d/%(processName)s] %(message)s")
        handler.setFormatter(formatter)
        handler.setLevel(logging.DEBUG)
        logger.addHandler(handler)
        logger.setLevel(logging.DEBUG)
    return logger

def fn1(n):
    get_logger().debug("fn1(%d); cpu# %d", n, psutil.Process().cpu_num())

if __name__ == "__main__":
    get_logger().debug("main")
    Parallel(n_jobs=multiprocessing.cpu_count())(delayed(fn1)(n) for n in range(1, 101))

在本地和Docker中运行此程序将记录以下日志行:
[21/LokyProcess-2] fn1(81); cpu# 11
[28/LokyProcess-9] fn1(82); cpu# 6
[29/LokyProcess-10] fn1(83); cpu# 2
[31/LokyProcess-12] fn1(84); cpu# 0
[22/LokyProcess-3] fn1(85); cpu# 3
[23/LokyProcess-4] fn1(86); cpu# 1
[20/LokyProcess-1] fn1(87); cpu# 7
[25/LokyProcess-6] fn1(88); cpu# 3
[27/LokyProcess-8] fn1(89); cpu# 4
[21/LokyProcess-2] fn1(90); cpu# 9
[28/LokyProcess-9] fn1(91); cpu# 10
[26/LokyProcess-7] fn1(92); cpu# 11
[22/LokyProcess-3] fn1(95); cpu# 9
[29/LokyProcess-10] fn1(93); cpu# 2
[24/LokyProcess-5] fn1(94); cpu# 10
[23/LokyProcess-4] fn1(96); cpu# 1
[20/LokyProcess-1] fn1(97); cpu# 9
[23/LokyProcess-4] fn1(98); cpu# 1
[27/LokyProcess-8] fn1(99); cpu# 4
[21/LokyProcess-2] fn1(100); cpu# 5

请注意,我的系统上所有12个CPU均在使用中。请注意:
  • 同一物理CPU被多个进程使用(进程#22和25使用cpu#3)
  • 一个单独的进程可以使用多个CPU(进程#21使用CPU#11和9)
使用docker run --cpus 1 ...运行相同的程序仍然会导致所有12个CPU被所有12个已启动的进程使用,就像--cpus参数不存在一样。它只限制了docker允许使用的总CPU时间的百分比。
使用docker run --cpusets-cpus 0-1 ...运行相同的程序将导致所有12个已启动的进程仅使用2个物理CPU:
[11/LokyProcess-2] fn1(35); cpu# 0
[11/LokyProcess-2] fn1(36); cpu# 0
[12/LokyProcess-3] fn1(37); cpu# 1
[11/LokyProcess-2] fn1(38); cpu# 0
[15/LokyProcess-6] fn1(39); cpu# 1
[17/LokyProcess-8] fn1(40); cpu# 0
[11/LokyProcess-2] fn1(41); cpu# 0
[10/LokyProcess-1] fn1(42); cpu# 1
[11/LokyProcess-2] fn1(43); cpu# 1
[13/LokyProcess-4] fn1(44); cpu# 1
[12/LokyProcess-3] fn1(45); cpu# 0
[12/LokyProcess-3] fn1(46); cpu# 1

回答声明"他们总是只使用一个物理CPU" - 只有当--cpusets-cpus参数恰好/仅为1个CPU时才为真。
(顺便提一句--本例中设置日志的原因是因为joblib存在一个未解决的bug。)

6

4
您可以通过执行以下命令来测试多处理器是否正常工作:
$ docker run -it --rm ubuntu:20.04
root@somehash:/# apt update && apt install stress
root@somehash:/# stress --cpu 8 # 8 if you have 8 cores

如果您有多个核心,您可以在另一个终端中测试命令htoptop,您应该看到所有内核正在运行。如果您使用htop,则应该看到以下内容。 如下图所示: enter image description here 如果您已经进行到这一步,则说明一切正常。此外,当我运行您提供的脚本时,我看到我的处理器正在按照预期工作,您可以查看下面的图片。(我还添加了显示进程的过程,我在ipython终端内运行您的脚本。我还将from sklearn.externals.joblib.parallel import Parallel, delayed更改为from joblib.parallel import Parallel, delayed,因为否则无法正常工作)。 如下图所示: enter image description here 希望所提供的信息有所帮助。对于其他提示,您可能要检查docker版本。

4

尝试从头开始创建机器(将数值替换为所需数值):

docker-machine rm default
docker-machine create -d virtualbox --virtualbox-cpu-count=8 --virtualbox-memory=8192 --virtualbox-disk-size=10000 default

这只是为了保险起见。

现在是重点:

在运行镜像之前,要指定核心数。以下命令将使用 8 个核心。

docker run -it --cpuset-cpus="0-7" your_image_name

在Docker中检查,如果你不仅在Python上成功了

nproc

祝你好运,让我们知道它的进展情况!


2
''' Distributed load among several Docker containers using Python multiprocessing capabilities '''

import random
import time
import subprocess
import queue
from multiprocessing import Pool, Queue, Lock

LOCK = Lock()
TEST_QUEUE = Queue()


class TestWorker(object):
    ''' This Class is executed by each container '''

    @staticmethod
    def run_test(container_id, value):
        ''' Operation to be executed for each container '''

        cmd = ['docker exec -it {0} echo "I am container {0}!, this is message: {1}"' \
                .format(container_id, value)]
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
        for line in process.stdout:
            print(line.decode('utf-8')[:-2])
        process.wait()

    @staticmethod
    def container(container_id):
        ''' Here we get a value from the shared queue '''

        while not TEST_QUEUE.empty():
            LOCK.acquire()
            try:
                value = TEST_QUEUE.get(block=False)
                time.sleep(0.5)
            except queue.Empty:
                print("Queue empty ):")
                return
            print("\nProcessing: {0}\n".format(value))
            LOCK.release()
            TestWorker.run_test(container_id, value)

def master():
    ''' Main controller to set containers and test values '''

    qty = input("How many containers you want to deploy: ")
    msg_val = input("How many random values you want to send among this containers: ")

    print("\nGenerating test messages...\n")
    for _ in range(int(msg_val)):
        item = random.randint(1000, 9999)
        TEST_QUEUE.put(item)
    ids = []
    for _ in range(int(qty)):
        container_id = subprocess.run(["docker", "run", "-it", "-d", "centos:7"], \
                stdout=subprocess.PIPE)
        container_id = container_id.stdout.decode('utf-8')[:-1]
        ids.append(container_id)
    pool = Pool(int(qty))
    pool.map(TestWorker.container, ids)
    pool.close()

master()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接