Python 多进程模块

4
英译中:

编辑:更新了环境信息(请参见第一部分)

环境

我正在使用Python 2.7

Ubuntu 16.04

问题

我有一个应用程序,我已将其简化为三个阶段的过程:

  1. 从多个数据源(HTTP请求、系统信息等)收集数据
  2. 基于这些数据计算指标
  3. 以各种格式输出这些指标

每个阶段必须在进入下一个阶段之前完成,但是每个阶段由可以并行运行的多个子任务组成(我可以发送3个HTTP请求并读取系统日志,同时等待它们返回)

我已将这些阶段划分为模块,并将子任务划分为子模块,因此我的项目层次结构如下:

+ datasources
|-- __init__.py
|-- data_one.py
|-- data_two.py
|-- data_three.py
+ metrics
|-- __init__.py
|-- metric_one.py
|-- metric_two.py
+ outputs
|-- output_one.py
|-- output_two.py
- app.py

`app.py` 大致如下(为简洁起见,这是伪代码):
import datasources
import metrics
import outputs

for datasource in dir(datasources):
    datasource.refresh()
for metric in dir(metrics):
    metric.calculate()
for output in dir(outputs):
    output.dump()

“额外的代码包装了dir调用以忽略系统模块,进行异常处理等等——但这就是它的要点。”
“每个数据源子模块大致如下:”
data = []

def refresh():
    # Populate the "data" member somehow
    data = [1, 2, 3]
    return

每个指标子模块大致看起来如下:
import datasources.data_one as data_one
import datasources.data_two as data_two

data = []

def calculate():
    # Use the datasources to compute the metric
    data = [sum(x) for x in zip(data_one, data_two)]
    return

为了并行化第一个阶段(数据源),我编写了类似以下简单代码的东西:
def run_thread(datasource):
    datasource.refresh()

threads = []
for datasource in dir(datasources):
    thread = threading.Thread(target=run_thread, args=(datasource))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

这个可以工作,之后我可以计算任何指标,datasources.x.data属性会被填充。
为了并行化第二阶段(指标),因为它更少依赖于I/O而更多依赖于CPU,我感觉简单的线程并不能真正加速,我需要使用multiprocessing模块来利用多个核心。我写了以下代码:
def run_pool(calculate):
    calculate()

pool = multiprocessing.Pool()
pool.map(run_pool, [m.calculate for m in dir(metrics)]
pool.close()
pool.join()

这段代码运行了几秒钟(所以我认为它在工作?)但是当我尝试时:
metrics.metric_one.data

它返回[],就像从未运行该模块一样。
不知何故,使用多进程模块似乎会将线程作用域限制,以至于它们不再共享数据属性。我应该如何重写这个代码,以便可以并行计算每个指标,利用多个核心,但在完成后仍然可以访问数据?

请点击此处了解有关多进程和多线程之间差异的更多信息。 - bendl
2个回答

0

这给了我在谷歌搜索时的一些方向,但并没有像Scott Mermelstein的回答那样真正回答手头的问题。 - stevendesu

0

根据评论再次更新: 由于您使用的是2.7版本,并且处理的是模块而不是对象,因此您在序列化所需内容时遇到了问题。解决方法并不美观。它涉及将每个模块的名称传递给您的操作函数。我更新了partial部分,并删除了with语法。

有几点需要注意:

首先,一般来说,多核比线程更好。使用线程时,您总是有处理全局解释器锁的风险,这可能非常低效。如果您使用多核,则这将成为一个非问题。

其次,您已经掌握了正确的概念,但通过拥有全局模块数据成员使其变得奇怪。让您的源代码返回您感兴趣的数据,并使您的指标(和输出)以数据列表作为输入并输出结果列表。

这将使您的伪代码变成以下内容:

app.py:

import datasources
import metrics
import outputs

pool = multiprocessing.Pool()
data_list = pool.map(lambda o: o.refresh, list(dir(datasources)))
pool.close()
pool.join()

pool = multiprocessing.Pool()
metrics_funcs = [(m, data_list) for m in dir(metrics)]
metrics_list = pool.map(lambda m: m[0].calculate(m[1]), metrics_funcs)
pool.close()
pool.join()

pool = multiprocessing.Pool()
output_funcs = [(o, data_list, metrics_list) for o in dir(outputs)]
output_list = pool.map(lambda o: o[0].dump(o[1], o[2]), output_funcs)
pool.close()
pool.join()

一旦您完成此操作,您的数据源将会如下所示:

def refresh():
    # Populate the "data" member somehow
    return [1, 2, 3]

而你的指标将会呈现如下:

def calculate(data_list):
    # Use the datasources to compute the metric
    return [sum(x) for x in zip(data_list)]

最后,你的输出可能看起来像这样:

def dump(data_list, metrics_list):
    # do whatever; you now have all the information

删除数据 "global" 并传递它使每个部分更加清洁(也更容易测试)。这突出了使每个部分完全独立的重点。正如您所看到的,我所做的就是改变传递给 map 的列表中的内容,在此情况下,我通过将它们作为元组传递并在函数中进行解包来注入所有先前的计算。当然,您不必使用 lambda。您可以单独定义每个函数,但确实没有太多需要定义的内容。然而,如果您确实定义每个函数,您可以使用 partial 函数来减少传递的参数数量。我经常使用这种模式,在您更复杂的代码中,您可能需要使用这种方式。以下是一个示例:

from functools import partial

do_dump(module_name, data_list, metrics_list):
    globals()[module_name].dump(data_list, metrics_list)

invoke = partial(do_dump, data_list=data_list, metrics_list=metrics_list)
with multiprocessing.Pool() as pool:
    output_list = pool.map(invoke, [o.__name__ for o in dir(outputs)])
    pool.close()
    pool.join()

更新,按照评论:

当您使用map时,您可以确保输入的顺序与输出的顺序相匹配,即data_list[i]是运行dir(datasources)[i].refresh()的输出。我建议您对app.py进行更改,而不是将datasources模块导入到metrics中:

data_list = ...
pool.close()
pool.join()
data_map = {name: data_list[i] for i, name in enumerate(dir(datasources))}

然后将 data_map 传递给每个度量。 然后,该度量按名称获取它想要的数据,例如:

d1 = data_map['data_one']
d2 = data_map['data_two']
return [sum(x) for x in zip([d1, d2])]

1
现在要测试一下,但看起来这将解决我的所有问题 :) 可能需要我花点时间来处理异常处理和其他随机检查。再加上我是 Python 新手这个事实也不帮助 :D - stevendesu
1
我已经更新了原始帖子,包括Python版本和操作系统。我正在运行Ubuntu 16.04,而不是Windows。同时,将对象传递而不是函数进行更新会导致类似的错误:无法pickle <type 'module'> - stevendesu
好的。我很惊讶2.7多进程如此不同。在Python 3中,多进程可以通过“fork”或“spawn”启动,并且当它“forks”时,您不必担心pickling-它只使用unix fork。显然,在Python 2中,您仍在使用“spawn”方法,它会pickle您的状态并将其传递... - Scott Mermelstein
有一些解决办法,但都不是很好。理想情况下,你应该以某种方式使用对象,因为你的用法明显是多态的一个例子。然后这个对象就可以被序列化了,而且不会成为麻烦。最简单的方法可能是传递每个模块的“__name__”而不是模块或函数。然后在代码中访问该模块(及其函数)。 - Scott Mermelstein
很好,我很高兴你已经明白了!显然,即使在Python 2.7中,map_async也会转发异常。我从未使用过它,但从map转换到map_async不应该很难。 - Scott Mermelstein
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接