使用Dask并行读取、处理和合并Pandas数据框。

4

我正在尝试同时读取和处理一系列csv文件,并将输出合并到单个pandas dataframe中以进行进一步处理。

我的工作流程包括3个步骤:

  • 通过读取一系列csv文件(具有相同的结构)创建一系列pandas dataframe

    def loadcsv(filename): df = pd.read_csv(filename) return df

  • 为每个dataframe创建一个新列,通过处理2个现有列来实现

    def makegeom(a,b): return 'Point(%s %s)' % (a,b)

    def applygeom(df): df['Geom']= df.apply(lambda row: makegeom(row['Easting'], row['Northing']), axis=1) return df

  • 将所有dataframe连接成一个单独的dataframe

    frames = [] for i in csvtest: df = applygeom(loadcsv(i)) frames.append(df) mergedresult1 = pd.concat(frames)

在我的工作流中,我使用pandas(每个csv(15)文件都有超过2 * 10 ^ 6个数据点),因此完成需要一段时间。我认为这种工作流应该利用一些并行处理(至少对于read_csv和apply步骤),所以我尝试了dask,但我无法正确使用它。在我的尝试中,我没有获得任何速度上的改进。
我制作了一个简单的笔记本来复制我正在做的事情:

https://gist.github.com/epifanio/72a48ca970a4291b293851ad29eadb50

我的问题是...使用dask完成我的用例的正确方法是什么?
2个回答

4

Pandas

在Pandas中,我会使用apply方法

In [1]: import pandas as pd

In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})

In [3]: def makegeom(row):
   ...:      a, b = row
   ...:      return 'Point(%s %s)' % (a, b)
   ...: 

In [4]: df.apply(makegeom, axis=1)
Out[4]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)
dtype: object

Dask.dataframe

在dask.dataframe中,您可以做同样的事情。
In [5]: import dask.dataframe as dd

In [6]: ddf = dd.from_pandas(df, npartitions=2)

In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)

添加新系列

在任何情况下,您都可以将新系列添加到数据帧中。

df['geom'] = df[['a', 'b']].apply(makegeom)

创建

如果您有CSV数据,那么我建议使用dask.dataframe.read_csv函数。

ddf = dd.read_csv('filenames.*.csv')

如果您有其他类型的数据,我会使用dask.delayed

我现在正在尝试使用delayed: https://gist.github.com/anonymous/34816085f0dfc2e26a5130a59aa920c1,但它仍然在工作... 'htop'显示所有CPU都在工作,但不是100%。我看到8个进程在以约15%的速度工作,我将尝试您的方法。我的数据存储为feather二进制文件(在示例中,我使用csv来简化我的用例)。 - epifanio
1
你的makegeom函数受GIL限制。你应该阅读http://dask.readthedocs.io/en/latest/scheduler-choice.html,了解在你的情况下如何选择一个好的调度器。 - MRocklin
1
我仍在解决我的问题。我更改了makegeom函数,用自定义的numpy代码(速度更快)替换了apply。现在我正在处理一个新笔记本。我的计划是先学习一些关于队列和进程之间的“共享对象”,然后了解如何使用dask和分布式。 - epifanio

1
同时,我已经找到了其他方法(替代Dask),在我看来相对较容易,可以并行地执行函数func。在这两种情况下,我都利用了numpy.array_split方法。
其中一种方法使用了Python的multiprocessing.Poolnumpy.array_splitpandas.concat的组合,运行方式如下:
import numpy as np

def func(array):
    # do some computation on the given array
    pass

def parallelize_dataframe(df, func, n_cores=72):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

另一种方法是使用功能强大但简单的ray集群(如果您可以在多台机器上运行代码,则非常有用):

# connect to a ray cluster
# 

import ray

ray.init(address="auto", redis_password="5241590000000000")

import numpy as np


@ray.remote
def func(df):
    # do some computation on the given dataframe
    pass

df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))

上述方法对于简单的方法func而言效果非常好,其中的计算可以使用numpy进行,并将返回的结果连接回pandas数据框中。对于执行简单文件操作的方法,我也发现parmap.map很有用 - 但这与本S.O.问题无关。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接