当axis=0时,Pandas可以并行应用apply函数

7
我想并行地对所有pandas列应用某些函数。例如,我想以并行方式执行以下操作:
def my_sum(x, a):
    return x + a


df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})
df.apply(lambda x: my_sum(x, 2), axis=0)

我知道有一个swifter包,但它不支持在apply中使用axis=0

NotImplementedError: Swifter无法在大型数据集上执行axis=0的apply操作。Dask当前没有实现axis=0的apply操作。更多细节请参考https://github.com/jmcarpenter2/swifter/issues/10

Dask也不支持在axis=0时进行此操作(根据swifter文档)。

我已经搜索了几个来源,但没有找到简单的解决方案。

不能相信在pandas中会这么复杂。


1
可以看一下Pandarallel - anky
将数据转置并传递 axis=1 - Quang Hoang
@anky_91,我刚试了Pandarallel。它一直卡住不动,永远也结束不了。可能是因为我在Windows上的原因出了问题。我简直不敢相信。在R中,至少有三种简单的解决方案可以解决这个问题。 - Mislav
https://github.com/jmcarpenter2/swifter/pull/98 - 也许这就是你要找的。 - Skarlett
1
我的原始问题与上面的帖子相同。唯一的区别是函数更加复杂。 - Mislav
显示剩余8条评论
4个回答

4

Koalas 提供了一种在数据框上并行执行计算的方法。它接受与 pandas 相同的命令,但在后台使用 Apache Spark 引擎执行。

请注意,您需要可用的并行基础设施才能正确使用它。

在他们的博客文章中,他们比较了以下代码块:

pandas:

import pandas as pd
df = pd.DataFrame({'x': [1, 2], 'y': [3, 4], 'z': [5, 6]})
# Rename columns
df.columns = [‘x’, ‘y’, ‘z1’]
# Do some operations in place
df[‘x2’] = df.x * df.x

考拉:
import databricks.koalas as ks
df = ks.DataFrame({'x': [1, 2], 'y': [3, 4], 'z': [5, 6]})
# Rename columns
df.columns = [‘x’, ‘y’, ‘z1’]
# Do some operations in place
df[‘x2’] = df.x * df.x

[pandas, PySpark, pyarrow, matplotlib]的翻译请参阅 https://koalas.readthedocs.io/en/latest/getting_started/install.html - gosuto
我会安装PySpark和pyarrow,尝试后并给您反馈。 - Mislav
这么“简单”的任务需要很多依赖。 - Mislav
ks.from_pandas 也会导致我的会话卡死 :(. 我不明白发生了什么。 - Mislav
1
这项任务并不像看起来那么简单。由于Python本身不是一种并行语言,因此您需要其他引擎来进行翻译。对于Koalas而言,这个引擎就是Apache Spark。 - gosuto

1
真正的答案藏在注释中,因此我将添加这个答案:使用mapply
import pandas as pd
import mapply

mapply.init(n_workers=-1, chunk_size=1)

def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})
df.mapply(lambda x: my_sum(x, 2), axis=0)

我尝试过swifterpandarallel,但是swifter在处理列时根本不起作用,而pandarallel似乎会在所有工作进程上重复工作。只有mapply有效。


不确定是否有意为之,但您将n_workers设置为-1,这对我没有起作用,并且导致了非常令人困惑的错误,直到我将n_workers设置为正数。 - krock
实际上它在Linux上运行良好,但我无法让它在Windows上工作。 - krock

0
在我看来,这个问题应该集中在如何将数据分配到可用资源上。Dask提供了map_partitions,它可以在每个DataFrame分区上应用Python函数。当然,您的工作站可以处理的每个分区的行数取决于可用的硬件资源。以下是一个基于您在问题中提供的信息的示例:
# imports
import dask
from dask import dataframe as dd
import multiprocessing as mp
import numpy as np
import pandas as pd

# range for values to be randomly generated
range_ = {
    "min": 0,
    "max": 100
}

# rows and columns for the fake dataframe
df_shape = (
                int(1e8), # rows
                2 # columns
            )

# some fake data
data_in = pd.DataFrame(np.random.randint(range_["min"], range_["max"], size = df_shape), columns = ["legs", "wings"])

# function to apply adding some value a to the partition
def my_sum(x, a):
    return x + a
"""
applies my_sum on the partitions rowwise (axis = 0)

number of partitions = cpu_count

the scheduler can be:
"threads": Uses a ThreadPool in the local process
"processes": Uses a ProcessPool to spread work between processes
"single-threaded": Uses a for-loop in the current thread
"""
data_out = dd.from_pandas(data_in, npartitions = mp.cpu_count()).map_partitions(
        lambda df: df.apply(
            my_sum, axis = 0, a = 2
        )
).compute(scheduler = "threads")

# inspection
print(data_in.head(5))
print(data_out.head(5))

此实现已经在一个包含100,000,000行和2列的随机生成数据框上进行了测试。

计算机配置
CPU:Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
内存总量:16698340 kB
操作系统:Ubuntu 18.04.4 LTS


0
您可以使用Dask延迟接口来设置自定义工作流程:
import pandas as pd
import dask
import distributed

# start local cluster, by default one worker per core
client = distributed.Client() 

@dask.delayed
def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})    

# Here, we mimic the apply command. However, we do not
# actually run any computation. Instead, that line of code 
# results in a list of delayed objects, which contain the 
# information what computation should be performed eventually
delayeds = [my_sum(df[column], 2) for column in df.columns]

# send the list of delayed objects to the cluster, which will 
# start computing the result in parallel. 
# It returns future objects, pointing to the computation while
# it is still running
futures = client.compute(delayeds)

# get all the results, as soon as they are ready. This returns 
# a list of pandas Series objects, each is one column of the 
# output dataframe
computed_columns = client.gather(futures)

# create dataframe out of individual columns
computed_df = pd.concat(computed_columns, axis = 1)

或者,您还可以使用dask的多进程后端:

import pandas as pd
import dask

@dask.delayed
def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})    

# same as above
delayeds = [my_sum(df[column], 2) for column in df.columns]

# run the computation using the dask's multiprocessing backend
computed_columns = dask.compute(delayeds, scheduler = 'processes')

# create dataframe out of individual columns
computed_df = pd.concat(computed_columns, axis = 1)

distributed.Client()冻结了我的会话。可能是因为这个问题: https://github.com/dask/dask/issues/5525 - Mislav
distributed.Client()是一个方便的方法,但有时会引起麻烦。你可以通过费力地设置集群(“艰难的方式”)(实际上并不难,你只需要在终端中运行两个命令),或者使用多进程后端代替(请参见我两分钟后的编辑)。 - Arco Bast
dask.compute现在永远不会结束。我没有说,我在Windows 10上,使用VScode编写代码,并拥有AMD Ryzen处理器。 - Mislav
@Mislav,也许在您尝试从会话内生成新进程时存在问题。您可以尝试在终端上设置集群,然后从会话内连接到调度程序吗?请参阅此处的说明:https://distributed.dask.org/en/latest/quickstart.html#setup-dask-distributed-the-hard-way。 - Arco Bast

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接