我经常使用Python中的“multiprocessing”本地库来处理并行性。为了控制队列中的进程数,我使用一个共享变量作为计数器。在下面的示例中,您可以看到简单进程的并行执行方式。
我更新了脚本以使其更易于使用。基本上,您需要做的唯一事情就是使用您想要并行运行的函数覆盖“process”方法。查看示例,该过程非常简单。或者,您也可以删除所有执行日志发生的情况。
当我有时间时,我将更新代码以适用于返回值的进程。
要求
user@host:~$ pip install coloredlogs==15.0.1
代码
并行处理脚本(复制并粘贴):
from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os
LOG_LEVEL = "DEBUG"
def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=level
)
logger = logging.getLogger(name)
coloredlogs.install(level=level, logger=logger, isatty=True)
return logger
class ParallelProcessing:
"""
Parallel processing.
References
----------
[1] Class `ParallelProcessing`: https://dev59.com/gWIj5IYBdhLWcg3wKyS0#70464369
Examples
--------
>>> class MyParallelProcessing(ParallelProcessing):
>>> def process(self, name: str) -> None:
>>> logger = get_logger()
>>> logger.info(f"Executing process: {name}...")
>>> time.sleep(5)
>>>
>>>
>>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
>>> mpp = MyParallelProcessing()
>>> mpp.run(args_list=params_list)
"""
_n_jobs: int
_waiting_time: int
_queue: Value
_logger: Logger
def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
"""
Instantiates a parallel processing object to execute processes in parallel.
Parameters
----------
n_jobs: int
Number of jobs.
waiting_time: int
Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
"""
self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
self._logger = get_logger()
def process(self, *args) -> None:
"""
Abstract process that must be overridden.
Parameters
----------
*args
Parameters of the process to be executed.
"""
raise NotImplementedError("Process not defined ('NotImplementedError' exception).")
def _execute(self, *args) -> None:
"""
Run the process and remove it from the process queue by decreasing the queue process counter.
Parameters
----------
*args
Parameters of the process to be executed.
"""
self.process(*args)
self._queue.value -= 1
def _error_callback(self, result: Any) -> None:
"""
Error callback.
Parameters
----------
result: Any
Result from exceptions.
"""
self._logger.error(result)
os._exit(1)
def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
"""
Run processes in parallel.
Parameters
----------
args_list: Iterator[tuple]
List of process parameters (`*args`).
use_multithreading: bool
Use multithreading instead multiprocessing.
"""
manager = Manager()
self._queue = manager.Value('i', 0)
lock = manager.Lock()
pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)
start_time = datetime.now()
with lock:
for args in args_list:
while True:
if self._queue.value < self._n_jobs:
self._queue.value += 1
pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)
break
else:
self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
time.sleep(self._waiting_time)
pool.close()
pool.join()
exec_time = datetime.now() - start_time
self._logger.info(f"Execution time: {exec_time}")
使用示例:
class MyParallelProcessing(ParallelProcessing):
def process(self, name: str) -> None:
"""
Process to run in parallel (overrides abstract method).
"""
logger = get_logger()
logger.info(f"Executing process: {name}...")
time.sleep(5)
def main() -> None:
n_jobs = int(sys.argv[1])
params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
mpp = MyParallelProcessing(n_jobs=n_jobs)
mpp.run(args_list=params_list)
if __name__ == '__main__':
main()
执行和输出
user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934