如何在Python中进行并行编程?

199

对于C ++,我们可以使用OpenMP进行并行编程;但是,OpenMP在Python中不起作用。如果我想要并行运行我的Python程序的某些部分,该怎么办?

代码结构可能被认为是:

solve1(A)
solve2(B)

如何并行运行以下代码以减少运行时间:solve1solve2是两个独立的函数。

代码如下:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

其中 setinnersetouter 是两个独立的函数。这就是我想并行处理它们的地方...


40
请参阅multiprocessing。注意:Python的线程不适用于CPU密集型任务,仅适用于I/O密集型任务。 - 9000
7
@9000加100个互联网币,感谢您提到CPU与I/O相关任务。 - Hyperboreus
@9000 实际上,就我所知,线程并不适合于CPU密集型任务!在执行真正的CPU密集型任务时,进程才是正确的选择。 - Omar Al-Ithawi
7
为什么线程在使用许多 CPU 核心时(通常现在是这样)能够正常工作呢?因为您的进程可以运行多个线程,在并行加载所有这些核心的同时隐式地共享公共数据(即,不需要显式共享内存区域或进程间通信)。 - 9000
@9000 >>如果你有很多CPU核心(通常现在都是这样),线程可以很好地工作。然后,您的进程可以运行多个线程,以并行方式加载所有这些核心,并在它们之间隐式共享公共数据<< 你确定吗?因为我在这里读到了关于GIL的一些内容http://python-notes.curiousefficiency.org/en/latest/python3/multicore_python.html,在那里我理解的恰恰与你所说的相反! - AKA
2
@user2134774:嗯,是的,我的第二条评论没有多少意义。可能只有释放GIL的C扩展可以从中受益;例如,NumPy和Pandas的某些部分就是这样做的。在其他情况下,这是错误的(但我现在无法编辑它)。 - 9000
11个回答

219

你可以使用multiprocessing模块。对于这种情况,我可能会使用处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

这将生成可为您执行通用工作的进程。由于我们没有传递 processes,它将为您的机器上每个 CPU 核心生成一个进程。每个 CPU 核心可以同时执行一个进程。

如果您想将列表映射到单个函数,则可以执行以下操作:

args = [A, B]
results = pool.map(solve1, args)

不要使用线程,因为GIL会锁定Python对象的任何操作。


1
pool.map是否也接受字典作为参数?还是只能接受简单的列表? - The Bndr
只是列表,我想。但你可以直接传入dict.items(),它将是一个键值元组的列表。 - Matt Williamson
除了我上一条评论之外:dict.items() 是可以正常工作的。错误是因为我不得不更改 process 函数内部变量的处理方式。不幸的是,错误信息并不是很有帮助...所以:谢谢你的提示 :-) - The Bndr
2
这里的timeout是什么意思? - gamma
Python的dict是无序的。不过,你可以使用OrderedDict来实现有序字典。 - CoderGuy123
显示剩余3条评论

59

使用 Ray 可以非常优雅地完成此操作。

要并行化您的示例,您需要使用 @ray.remote 装饰器定义函数,然后使用 .remote 调用它们。

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

相比于multiprocessing模块,这种方法具有以下几个优点:

  1. 同一份代码可在多核机器和机群上运行。
  2. 进程之间通过共享内存和零拷贝序列化高效地共享数据。
  3. 错误消息传递得很好。
  4. 这些函数调用可以组合在一起使用,例如:

@ray.remote
def f(x):
    return x + 1

x_id = f.remote(1)
y_id = f.remote(x_id)
z_id = f.remote(y_id)
ray.get(z_id)  # returns 4
除了远程调用函数之外,类还可以作为actors在远程实例化。

请注意,Ray是我一直在帮助开发的框架。


我在尝试在Python中安装包时一直收到错误提示:“无法找到满足要求的版本ray(来自版本:)未找到匹配的分发”。 - alwaysaskingquestions
2
通常这种错误意味着您需要升级 pip。我建议尝试 pip install --upgrade pip。如果您需要使用 sudo,则可能安装 raypip 版本与正在升级的版本不同。您可以通过 pip --version 进行检查。此外,目前不支持 Windows,所以如果您在 Windows 上,则可能会出现问题。 - Robert Nishihara
2
只是一条说明,这主要用于在多台机器上分发并发作业。 - Matt Williamson
3
实际上,它既针对单机情况进行了优化,也针对集群环境进行了优化。很多设计决策(例如共享内存、零拷贝序列化)都旨在很好地支持单机。 - Robert Nishihara
3
如果文档更明确地指出这一点将会很好。我从阅读文档中得出的感觉是它并不是真正针对单机情况而设计的。 - Sledge
显示剩余3条评论

7
解决方案,正如其他人所说,是使用多个进程。然而,哪种框架更适合取决于许多因素。除了已经提到的因素外,还有 charm4pympi4py(我是 charm4py 的开发者)。
实现上述示例的更有效方法是使用工作池抽象。主循环在每个1000次迭代中将相同的参数(包括完整图形 G)一遍又一遍地发送给工作进程。由于至少有一个工作进程将驻留在不同的进程上,因此这涉及将参数复制并发送到其他进程。这可能会非常昂贵,具体取决于对象的大小。相反,最好让工作进程存储状态并仅发送更新的信息。
例如,在 charm4py 中可以这样做:
class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

请注意,对于这个例子,我们只需要一个工作线程。主循环可以执行其中一个函数,并让工作线程执行另一个函数。但我的代码有助于说明一些事情:
1. 工作线程A在进程0中运行(与主循环相同)。当result_a.get()被阻塞等待结果时,工作线程A在同一进程中进行计算。 2. 参数自动通过引用传递给工作线程A,因为它在同一个进程中(没有涉及复制)。

6

21
你把不能_真正地_并发运行代码称为“有趣”? :-/ - ManuelSchneid3r

4

您可以使用joblib库进行并行计算和多进程处理。

from joblib import Parallel, delayed

您可以简单地创建一个名为foo的函数,该函数需要以并行方式运行。根据以下代码段实现并行处理:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

num_cores 可以从 multiprocessing 库中获取,如下所示:

import multiprocessing

num_cores = multiprocessing.cpu_count()

如果您有一个带有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,您可以使用来自functools库的partial函数如下:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

您可以在此处找到关于Python和R多进程的完整解释以及几个示例


4

在某些情况下,可以使用Numba自动并行化循环,但它仅适用于Python的一小部分。

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

不幸的是,似乎Numba只能处理Numpy数组,而不能处理其他Python对象。理论上,也可能将Python编译为C++,然后使用Intel C++编译器自动并行化它, 但我还没有尝试过。


2
我经常使用Python中的“multiprocessing”本地库来处理并行性。为了控制队列中的进程数,我使用一个共享变量作为计数器。在下面的示例中,您可以看到简单进程的并行执行方式。
我更新了脚本以使其更易于使用。基本上,您需要做的唯一事情就是使用您想要并行运行的函数覆盖“process”方法。查看示例,该过程非常简单。或者,您也可以删除所有执行日志发生的情况。
当我有时间时,我将更新代码以适用于返回值的进程。
要求
user@host:~$ pip install coloredlogs==15.0.1

代码

并行处理脚本(复制并粘贴)

#!/usr/bin/env python
# encoding: utf-8

from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os


LOG_LEVEL = "DEBUG"


def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
    assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")

    # Setting-up the script logging:
    logging.basicConfig(
        stream=sys.stdout,
        format="%(asctime)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level=level
    )

    logger = logging.getLogger(name)
    coloredlogs.install(level=level, logger=logger, isatty=True)

    return logger


class ParallelProcessing:
    """
    Parallel processing.

    References
    ----------
    [1] Class `ParallelProcessing`: https://dev59.com/gWIj5IYBdhLWcg3wKyS0#70464369

    Examples
    --------
    >>> class MyParallelProcessing(ParallelProcessing):
    >>>     def process(self, name: str) -> None:
    >>>         logger = get_logger()
    >>>         logger.info(f"Executing process: {name}...")
    >>>         time.sleep(5)
    >>>
    >>>
    >>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
    >>> mpp = MyParallelProcessing()
    >>> mpp.run(args_list=params_list)
    """

    _n_jobs: int
    _waiting_time: int
    _queue: Value
    _logger: Logger

    def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
        """
        Instantiates a parallel processing object to execute processes in parallel.

        Parameters
        ----------
        n_jobs: int
            Number of jobs.
        waiting_time: int
            Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
        """
        self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
        self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
        self._logger = get_logger()

    def process(self, *args) -> None:
        """
        Abstract process that must be overridden.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        raise NotImplementedError("Process not defined ('NotImplementedError' exception).")

    def _execute(self, *args) -> None:
        """
        Run the process and remove it from the process queue by decreasing the queue process counter.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        self.process(*args)
        self._queue.value -= 1

    def _error_callback(self, result: Any) -> None:
        """
        Error callback.

        Parameters
        ----------
        result: Any
            Result from exceptions.
        """
        self._logger.error(result)
        os._exit(1)

    def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
        """
        Run processes in parallel.

        Parameters
        ----------
        args_list: Iterator[tuple]
            List of process parameters (`*args`).
        use_multithreading: bool
            Use multithreading instead multiprocessing.
        """
        manager = Manager()
        self._queue = manager.Value('i', 0)
        lock = manager.Lock()
        pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)

        start_time = datetime.now()

        with lock:  # Write-protecting the processes queue shared variable.
            for args in args_list:
                while True:
                    if self._queue.value < self._n_jobs:
                        self._queue.value += 1

                        # Running processes in parallel:
                        pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)

                        break
                    else:
                        self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
                        time.sleep(self._waiting_time)

        pool.close()
        pool.join()

        exec_time = datetime.now() - start_time
        self._logger.info(f"Execution time: {exec_time}")

使用示例:

class MyParallelProcessing(ParallelProcessing):
    def process(self, name: str) -> None:
        """
        Process to run in parallel (overrides abstract method).
        """
        logger = get_logger()
        logger.info(f"Executing process: {name}...")
        time.sleep(5)


def main() -> None:
    n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.
    params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]

    mpp = MyParallelProcessing(n_jobs=n_jobs)

    # Executing processes in parallel:
    mpp.run(args_list=params_list)


if __name__ == '__main__':
    main()

执行和输出

user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478

user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672

user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

1
经理方面的卓越文章 - M__

2
如果您没有时间去学习其他答案中推荐的库或模块的要求和假设,那么以下内容可能适合您:
1. 给您的脚本提供选项,以便运行任务的各个部分。 2. 当准备并行运行n个部分时,使用`child = subprocess.Popen(args=[sys.argv[0],...])`启动它们,并在额外的选项和/或参数文件中提供部分编号和其他细节,并为每个子进程调用`child.wait()`。
如果您想监视进度,可以在工人完成工作或等待时启动更多的工人,使用`child.poll()`而不是`child.wait()`来检查`child.returncode`是否仍为`None`。
对于大型任务,启动新进程和写入/读取文件的开销很小。对于许多小任务,人们只需启动工人一次,然后通过管道或套接字与他们通信,但这需要大量的工作,并且必须小心地执行,以避免死锁的可能性。在这种情况下,最好学习如何使用其他答案中推荐的模块。

1
这是一个在Windows环境下完整可用的示例;异步处理的优点是节省时间:
import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool


def f1(a):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1


def f2(b):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1

if __name__ == '__main__':

    pool = Pool(multiprocessing.cpu_count())
    result1 = pool.apply_async(f1, [0])
    result2 = pool.apply_async(f2, [9])
    freeze_support()
    t0 = time.time()
    answer1 = result1.get(timeout=10)
    answer2 = result2.get(timeout=10)
    print(time.time()-t0)
    t0 = time.time()
    aa = f1(1)
    bb = f2(2)
    print(time.time()-t0)

0
你可以将你的Dataframe转换为Dask Dataframe,它可以为你处理并行计算。
import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接