正如@Khris在他的评论中所说,你应该将数据框分成几个大块,并并行迭代每个块。你可以随意将数据框分成随机大小的块,但更有意义的是根据你计划使用的进程数量将数据框分成相等大小的块。幸运的是,有人已经为我们解决了这部分问题:
import pandas as pd
import multiprocessing
num_processes = multiprocessing.cpu_count()
chunk_size = int(df.shape[0]/num_processes)
chunks = [df.iloc[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]
这将创建一个包含数据框分块的列表。现在,我们需要将其与一个用于操作数据的函数一起传递到进程池中。
def func(d):
return d * d
pool = multiprocessing.Pool(processes=num_processes)
result = pool.map(func, chunks)
在这一点上,result
将是一个列表,其中包含每个块在被操作后的结果。 在本例中,所有值都已经平方。 现在的问题是原始数据框没有被修改,所以我们必须用池中的结果替换它所有现有的值。
for i in range(len(result)):
df.iloc[result[i].index] = result[i]
现在,我用向量化的方式来操作数据框,如果我将其应用于整个数据框而不是分成块,可能会更快。但是,在您的情况下,您的函数会迭代每个块的每一行,然后返回该块。这使您可以同时处理num_process
行。
def func(d):
for row in d.iterrow():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end)
d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
return d
然后您重新分配原始数据框中的值,就成功地将此过程并行化了。
我应该使用多少进程?
您的最佳性能将取决于这个问题的答案。虽然“全部进程!!!”是一个答案,但更好的答案要复杂得多。在某个点之后,向问题投入更多进程实际上会产生比它所值得的更多开销。这被称为阿姆达尔定律。再次说明我们很幸运,因为其他人已经为我们解决了这个问题:
- Python multiprocessing's Pool process limit
- How many processes should I run in parallel?
一个很好的默认值是使用 multiprocessing.cpu_count()
,这是 multiprocessing.Pool
的默认行为。根据文档,“如果 processes 为 None,则使用 cpu_count() 返回的数字。”这就是为什么我在开始时将 num_processes
设置为 multiprocessing.cpu_count()
。这样,如果您将其移动到更强大的计算机上,则可以获得好处,而不必直接更改 num_processes
变量。