多进程写入 Pandas 数据框

8
所以我现在要做的事情是读取一个列表的列表,并将它们通过名为checker的函数,然后让log_result处理checker函数的结果。我正在尝试使用多线程来完成这个过程,因为实际上变量名rows_to_parse有数百万行,所以使用多个核心应该可以大大加快这个过程。
目前的代码无法运行并导致Python崩溃。
我的担忧和问题:
  1. 希望保持存在于变量df中的现有df在整个过程中保持索引不变,否则log_result将会对需要更新的行感到困惑。
  2. 我相当确定apply_async不是执行此任务的适当多进程函数,因为我认为计算机读取和写入df的顺序可能会破坏它???
  3. 我认为可能需要设置一个队列来写入和读取df,但我不确定如何去做。
感谢任何帮助。
import pandas as pd
import multiprocessing
from functools import partial

def checker(a,b,c,d,e):
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
    index_of_match = match.index.tolist()
    if len(index_of_match) == 1: #one match in df
        return index_of_match
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
        return [index_of_match[0]]
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
        return [a,b,c,d,e]



def log_result(result, dataf):
    if len(result) == 1: #
        dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df
        new_row = pd.DataFrame([result],columns=cols)
        dataf = dataf.append(new_row,ignore_index=True)


def apply_async_with_callback(parsing_material, dfr):
    pool = multiprocessing.Pool()
    for var_a, var_b, var_c, var_d, var_e in parsing_material:
        pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
    pool.close()
    pool.join()



if __name__ == '__main__':
    #setting up main dataframe
    cols = ['a','b','c','d','e']
    existing_data = [["YES","A","16052011","13031999",3],
                    ["NO","Q","11022003","15081999",3],
                    ["YES","A","22082010","03012001",9]]

    #main dataframe
    df = pd.DataFrame(existing_data,columns=cols)

    #new data
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
                    ['YES', 'W', '17061992', '26032012', 6],
                    ['YES', 'G', '01122006', '07082014', 2],
                    ['YES', 'N', '06081992', '21052008', 9],
                    ['YES', 'Y', '18051995', '24011996', 6],
                    ['NO', 'Q', '11022003', '15081999', 3],
                    ['NO', 'O', '20112004', '28062008', 0],
                    ['YES', 'R', '10071994', '03091996', 8],
                    ['NO', 'C', '09091998', '22051992', 1],
                    ['YES', 'Q', '01051995', '02012000', 3],
                    ['YES', 'Q', '26022015', '26092007', 5],
                    ['NO', 'F', '15072002', '17062001', 8],
                    ['YES', 'I', '24092006', '03112003', 2],
                    ['YES', 'A', '22082010', '03012001', 9],
                    ['YES', 'I', '15072016', '30092005', 7],
                    ['YES', 'Y', '08111999', '02022006', 3],
                    ['NO', 'V', '04012016', '10061996', 1],
                    ['NO', 'I', '21012003', '11022001', 6],
                    ['NO', 'P', '06041992', '30111993', 6],
                    ['NO', 'W', '30081992', '02012016', 6]]


    apply_async_with_callback(rows_to_parse, df)

什么是else: #没有匹配,给它参数写入df应该做什么?我认为如果你return [a, b, c, d, e],你的代码实际上会完成,但你会有其他问题,你也从未在任何地方使用dataf。 - Padraic Cunningham
谢谢您指出这一点,我已经修改了代码。所以 [a,b,c,d,e] 会被写入到 log_result 函数中的 df 中。 - user3374113
partial(log_result,dataf=dfr) 的签名与 log_results 不匹配。 - mdurant
1个回答

15

在多进程中这样更新DataFrame是行不通的:

dataf = dataf.append(new_row,ignore_index=True)

首先,这种方式非常低效(每次附加的复杂度为O(n),因此总复杂度为O(n^2))。更好的方法是在一次操作中将一些对象连接起来。

其次,更重要的是,对于每次更新,dataf没有进行锁定,因此不能保证两个操作不会发生冲突(我猜测这导致了Python崩溃)。

最后,append不会就地执行,因此变量dataf在回调结束后被丢弃!!并且未对父dataf进行任何更改。


我们可以使用MultiProcessing列表字典。如果您不关心顺序,则使用列表;如果您关心顺序(例如enumerate),则使用字典,但必须注意从async返回的值没有定义的顺序。
(或者我们可以创建一个实现Lock的对象,详见Eli Bendersky。)
因此进行以下更改:

df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])

dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)

现在,一旦异步操作完成,您就会得到一个包含多个DataFrames的MultiProcessing.list。 您可以使用concat函数(和ignore_index参数)将它们合并以获得所需的结果:

现在,一旦异步操作完成,您就会得到一个包含多个DataFrames的MultiProcessing.list。 您可以使用concat函数(和ignore_index参数)将它们合并以获得所需的结果:

pd.concat(d, ignore_index=True)

应该能解决问题。


注意:在每个阶段创建新的DataFrame,也比让pandas直接解析列表中的列表到DataFrame要低效。希望这只是一个玩具示例,实际上你需要让你的块相当大才能获得多进程的胜利(我听说50kb作为经验法则...),一次一行永远不会在这里获胜。


另外:在代码中避免使用全局变量(如df),将它们作为函数参数传递更加干净(在这种情况下,作为checker的参数)。


如果您不重新分配DataFrame并且从未两次写入相同位置,那么从多个线程同时写入它是否安全? - BallpointBen
这在 MultiProcessing.list/dict 上应该没问题,它会处理锁定,所以是安全的。 - Andy Hayden

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接