Python:如何在Python中运行嵌套的并行处理?

5

我有一个交易员交易数据集df。 我有以下两个级别的循环:

smartTrader =[]

for asset in range(len(Assets)):
    df = df[df['Assets'] == asset]
    # I have some more calculations here
    for trader in range(len(df['TraderID'])):
        # I have some calculations here, If trader is successful, I add his ID  
        # to the list as follows
        smartTrader.append(df['TraderID'][trader])

    # some more calculations here which are related to the first for loop.

我希望可以对Assets中的每个资产进行并行计算,并且对于每个资产的每个交易者也要进行并行计算。当所有这些计算都完成后,我想基于smartTrader列表做额外的分析。

这是我第一次尝试并行处理,请耐心等待,感谢您的帮助。


2
尝试使用 multiprocessing.Pool - kirbyfan64sos
1
我不确定在嵌套的for循环中如何调用此函数,你能给我提供一个小例子吗? - finstats
4个回答

3
如果您使用提供了 multiprocessing 的 fork 的 pathos,那么您可以轻松地嵌套并行映射。Pathos 旨在轻松测试嵌套并行映射的组合,这些组合是嵌套 for 循环的直接翻译。它提供了一系列阻塞、非阻塞、迭代、异步、串行、并行和分布式映射。
>>> from pathos.pools import ProcessPool, ThreadPool
>>> amap = ProcessPool().amap
>>> tmap = ThreadPool().map
>>> from math import sin, cos
>>> print amap(tmap, [sin,cos], [range(10),range(10)]).get()
[[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385, -0.27941549819892586, 0.6569865987187891, 0.9893582466233818, 0.4121184852417566], [1.0, 0.5403023058681398, -0.4161468365471424, -0.9899924966004454, -0.6536436208636119, 0.2836621854632263, 0.9601702866503661, 0.7539022543433046, -0.14550003380861354, -0.9111302618846769]]

以下是一个示例,使用了处理池和线程池。在这个示例中,线程映射调用是阻塞的,而处理映射调用是异步的(请注意最后一行的 get)。

下载 pathoshttps://github.com/uqfoundation 或者使用以下命令安装: $ pip install git+https://github.com/uqfoundation/pathos.git@master


3

使用Ray可以优雅地实现嵌套并行,该系统允许您轻松地并行化和分发Python代码。

假设您想并行化以下嵌套程序

def inner_calculation(asset, trader):
    return trader

def outer_calculation(asset):
    return  asset, [inner_calculation(asset, trader) for trader in range(5)]

inner_results = []
outer_results = []

for asset in range(10):
    outer_result, inner_result = outer_calculation(asset)
    outer_results.append(outer_result)
    inner_results.append(inner_result)

# Then you can filter inner_results to get the final output.

以下是Ray代码,用于并行执行上述代码:
  • 对于要在其自己的进程中并发执行的每个函数,请使用@ray.remote装饰器。远程函数返回一个future(即结果的标识符),而不是结果本身。
  • 调用远程函数f()时,需要添加remote修饰符,即f.remote()
  • 使用ids_to_vals()助手函数将嵌套的id列表转换为值。

请注意,程序结构相同。您只需添加remote,然后使用ids_to_vals()助手函数将远程函数返回的future(ids)转换为值。

import ray

ray.init()

# Define inner calculation as a remote function.
@ray.remote
def inner_calculation(asset, trader):
    return trader

# Define outer calculation to be executed as a remote function.
@ray.remote(num_return_vals = 2)
def outer_calculation(asset):
    return  asset, [inner_calculation.remote(asset, trader) for trader in range(5)]

# Helper to convert a nested list of object ids to a nested list of corresponding objects.
def ids_to_vals(ids):
    if isinstance(ids, ray.ObjectID):
        ids = ray.get(ids)
    if isinstance(ids, ray.ObjectID):
        return ids_to_vals(ids)
    if isinstance(ids, list):
        results = []
        for id in ids:
            results.append(ids_to_vals(id))
        return results
    return ids

outer_result_ids = []
inner_result_ids = []

for asset in range(10):
    outer_result_id, inner_result_id = outer_calculation.remote(asset)
    outer_result_ids.append(outer_result_id)
    inner_result_ids.append(inner_result_id)

outer_results = ids_to_vals(outer_result_ids)
inner_results = ids_to_vals(inner_result_ids)

使用Ray模块相比multiprocessing模块有许多优点。特别是,同一份代码可以在单台机器上运行,也可以在集群上运行。更多关于Ray的优点,请参见this related post

为什么assetinner_calculation的参数?如果我删除该参数,输出结果相同。 - Alex van Houten

1

可能线程,来自标准的python库,是最方便的方法:

import threading

def worker(id):
    #Do you calculations here
    return

threads = []
for asset in range(len(Assets)):
    df = df[df['Assets'] == asset]
    for trader in range(len(df['TraderID'])):
        t = threading.Thread(target=worker, args=(trader,))
        threads.append(t)
        t.start()
    #add semaphore here if you need synchronize results for all traders.

0

不要使用for,而要使用map

import functools
smartTrader =[]

m=map( calculations_as_a_function, 
        [df[df['Assets'] == asset] \
                for asset in range(len(Assets))])
functools.reduce(smartTradder.append, m)

从那时起,您可以尝试不同的并行map实现,例如{{link1:multiprocessing}}或{{link2:stackless}}


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接