多进程写入CSV文件

5

我正在尝试将一个庞大的数据集(约146m行)写入CSV文件。我尝试了以下方法:

def paramlist():
    for row in nodes.itertuples():
        l = []
        for row2 in ref_stops.itertuples():
            l.append((row[1], row[2], row[3], row2[1],
                     row2[2], row2[3], row2[4], haversine(row[3], row[2], row2[3], row2[2])))
        yield l

pool = multiprocessing.Pool()
pool.map(func, paramlist())

def func(params):
    with open(r'big_file.csv', 'a') as f:
        writer = csv.writer(f)
        for row in params:
            writer.writerow(row)

这段代码可以工作,但它会消耗所有的内存并终止程序。
我该如何优化它?


这可能包含相关信息。尝试在打开文件对象时应用缓冲:https://dev59.com/qXzaa4cB1Zd3GeqPWfAl - lennyklb
1
这个最小可复现示例无法工作。缺少 paramanodespool.map(... 返回为空。请添加缺失的代码。 - stovfl
2个回答

4

pool.map会在将可迭代对象的部分提交给池中的工作进程之前消耗整个可迭代对象。这就是为什么会出现内存问题。

你应该使用pool.imap来避免这种情况。关于这一点,请参考此帖子进行详细解释。

话虽如此,我真诚地怀疑多进程处理程序能够像你编写的那样加速程序,因为瓶颈在于磁盘I/O。打开、附加和关闭文件总比一个顺序写操作要慢。并行写入单个文件是不可能的。

假设生成l需要一些时间,如果你按照以下方式编写程序,则可能会加速:

from contextlib import closing
import multiprocessing
import csv
import pandas as pd
import numpy as np

# Just for testing
ref_stops = pd.DataFrame(np.arange(100).reshape((-1, 5)))
nodes = pd.DataFrame(np.arange(400).reshape((-1, 4)))
def haversine(a, b, c, d):
    return a*b*c*d

# This function will be executed by the workers
def join_rows(row):
    row_list = []
    # join row with all rows from `ref_stops` and compute haversine
    for row2 in ref_stops.itertuples():
        row_list.append((row[1], row[2], row[3],
                         row2[1], row2[2], row2[3], row2[4],
                         haversine(row[3], row[2], row2[3], row2[2])))
    return row_list


def main():
    with closing(multiprocessing.Pool()) as pool:
        # joined_rows will contain lists of joined rows in arbitrary order.
        # use name=None so we get proper tuples, pandas named tuples cannot be pickled, see https://github.com/pandas-dev/pandas/issues/11791
        joined_rows = pool.imap_unordered(join_rows, nodes.itertuples(name=None))

        # open file and write out all rows from incoming lists of rows
        with open(r'big_file.csv', 'w') as f:
            writer = csv.writer(f)
            for row_list in joined_rows:
                writer.writerows(row_list)

if __name__ == '__main__':
    main()

我猜你不关心顺序,否则你一开始就不会选择多进程了,对吧?这样做的好处是主进程不再生成行列表,而是由工作进程生成。一旦一个工作进程完成了一个列表,它就会将其返回给主进程,然后主进程将其条目附加到文件中。然后工作进程获取新的行并开始构建另一个列表。

在程序中通常使用更多的 pandas 功能可能也会更好(我猜你正在使用 pandas 数据框架,因为有 itertuples)。例如,你可以创建一个新的数据框架,而不是一个行列表,并使 haversinepandas.Series 对象兼容,这样你就不必在每个条目上调用它。


1

尝试将数据分块写入。 假设您从数据框中写入数据,以部分方式读取数据框,即按照一些块的方式。 每次只写入一个块,这样可以提高性能。


正如您在我的代码中所看到的,我有两个数据集,然后需要进行一些计算并将它们附加到彼此。 - Aleksandr Zakharov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接