Pandas df.iterrows() 并行化

45

我想要将以下代码并行化:

for row in df.iterrows():
    idx = row[0]
    k = row[1]['Chromosome']
    start,end = row[1]['Bin'].split('-')

    sequence = sequence_from_coordinates(k,1,start,end) #slow download form http

    df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
    df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
    df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))

我尝试过使用 multiprocessing.Pool(),因为每一行可以独立处理,但我无法想出如何共享 DataFrame。我也不确定这是否是使用 Pandas 进行并行处理的最佳方法。有什么帮助吗?


3
默认情况下,按行迭代的速度很慢。你可以尝试找到一种将操作向量化并且不需要迭代的方法,或者将数据框分成几个大块,并同时迭代每个块。 - Khris
当然,那是一种方法。但是如果有更好的方法,我仍在寻找。 - alec_djinn
2
你是否考虑过使用dask?它可以为您完成大部分并行化的工作。 - Zeugma
我不了解Dask,我会去看一下。 - alec_djinn
4个回答

75

正如@Khris在他的评论中所说,你应该将数据框分成几个大块,并并行迭代每个块。你可以随意将数据框分成随机大小的块,但更有意义的是根据你计划使用的进程数量将数据框分成相等大小的块。幸运的是,有人已经为我们解决了这部分问题

# don't forget to import
import pandas as pd
import multiprocessing

# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()

# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)

# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.iloc[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

这将创建一个包含数据框分块的列表。现在,我们需要将其与一个用于操作数据的函数一起传递到进程池中。

def func(d):
   # let's create a function that squares every value in the dataframe
   return d * d

# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)

# apply our function to each chunk in the list
result = pool.map(func, chunks)

在这一点上,result将是一个列表,其中包含每个块在被操作后的结果。 在本例中,所有值都已经平方。 现在的问题是原始数据框没有被修改,所以我们必须用池中的结果替换它所有现有的值。

for i in range(len(result)):
   # since result[i] is just a dataframe
   # we can reassign the original dataframe based on the index of each chunk
   df.iloc[result[i].index] = result[i]

现在,我用向量化的方式来操作数据框,如果我将其应用于整个数据框而不是分成块,可能会更快。但是,在您的情况下,您的函数会迭代每个块的每一行,然后返回该块。这使您可以同时处理num_process行。

def func(d):
   for row in d.iterrow():
      idx = row[0]
      k = row[1]['Chromosome']
      start,end = row[1]['Bin'].split('-')

      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
   # return the chunk!
   return d

然后您重新分配原始数据框中的值,就成功地将此过程并行化了。

我应该使用多少进程?

您的最佳性能将取决于这个问题的答案。虽然“全部进程!!!”是一个答案,但更好的答案要复杂得多。在某个点之后,向问题投入更多进程实际上会产生比它所值得的更多开销。这被称为阿姆达尔定律。再次说明我们很幸运,因为其他人已经为我们解决了这个问题:

  1. Python multiprocessing's Pool process limit
  2. How many processes should I run in parallel?

一个很好的默认值是使用 multiprocessing.cpu_count(),这是 multiprocessing.Pool 的默认行为。根据文档,“如果 processes 为 None,则使用 cpu_count() 返回的数字。”这就是为什么我在开始时将 num_processes 设置为 multiprocessing.cpu_count()。这样,如果您将其移动到更强大的计算机上,则可以获得好处,而不必直接更改 num_processes 变量。


9
如果pandas显示警告,请使用chunks = [df.iloc[i:i + chunk_size,:] for i in range(0, df.shape[0], chunk_size)] - Dat
2
np.array_split()可能比chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]更好。前者会自动处理行数不能被整除的情况,语法也更简单。 - N4v
1
np.array_split() 对我也起作用了。 - MrKingsley
1
如果我们有一个带有三个参数的函数,并且我们需要使用数据框的三列,我们应该怎么做? - RF1991

34

更快的方法(在我的情况下大约快10%):

与被接受的答案的主要区别: 使用 pd.concatnp.array_split 来分割和合并数据框。

import multiprocessing
import numpy as np


def parallelize_dataframe(df, func):
    num_cores = multiprocessing.cpu_count()-1  #leave one free to not freeze machine
    num_partitions = num_cores #number of partitions to split dataframe
    df_split = np.array_split(df, num_partitions)
    pool = multiprocessing.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

其中func是你想要应用于df的函数。如果有多个参数,请使用partial(func, arg=arg_val)


1
只是好奇,pool.map是否保留数据框的顺序。换句话说,pool.map的输出是否与传入的块的顺序相同?如果不是,则pd.concat可能无法按原始顺序重建数据框。我不知道np.aray_split,但我并不惊讶它更快。pd.concat也很可能比使用df.ix重新分配更快。 - TheF1rstPancake
3
是的,据我所知,数据框被正确地重新排列了。我不知道是否有强制执行的方法,但我有时间序列数据,它还没有引起问题。虽然我的索引是时间戳,但如果顺序混乱了,重新排序它们也不应该是个问题。我发现的另一个技巧是使用itertuples(),这样可以再快30%。 - ic_fl2
这个救了我的一天。非常感谢 @ic_fl2。 - SummerEla
你能帮忙回答一下这个问题吗:https://stackoverflow.com/questions/53561794/iteration-over-a-pandas-df-in-parallel - ak3191
1
这是一个非常好的答案! - Jinhua Wang
显示剩余2条评论

14
考虑使用dask.dataframe,例如在类似问题的此示例中所示:https://stackoverflow.com/a/53923034/4340584
Note: "dask"是一个Python库,用于在大型数据集上进行并行计算。
import dask.dataframe as ddf
df_dask = ddf.from_pandas(df, npartitions=4)   # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')

dask的解决方案看起来比在pandas中手动并行计算要简单得多! - sophros

1

如果要在数据框的分区上使用Dask(而不是操作轴的dask.apply),可以使用map_partitions

import multiprocessing
import dask.dataframe as ddf

# get num cpu cores
num_partitions = multiprocessing.cpu_count()

# create dask DF
df_dask = ddf.from_pandas(your_dataframe, npartitions=num_partitions)

# apply func to every partition in parallel
output = df_dask.map_partitions(func, meta=('output_col1_type','output_col2_type')).compute(scheduler='multiprocessing')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接