使用Pandas小型数据框架合并一个大型Dask数据框架。

26

参考这里的示例:YouTube: Dask-Pandas Dataframe Join,我尝试将一个大约 70GB 的 Dask 数据框与一个作为 Pandas 数据框加载的大约 24MB 的数据框合并。

合并基于两个列 A 和 B,我没有设置任何索引:

import dask.dataframe as dd
from dask.diagnostics import ProgressBar

small_df = pd.read_csv(dataframe1) # as pandas
large_df = dd.read_csv(dataframe2) #as dask.dataframe

df2 = large_df.merge(small_df, how='left', left_on=leftcolumns, right_on=rightcolumns) #do the merge

A = df2[df2['some column'] == 'somevalue'] #do a reduction that would fit on my computer

pbar = ProgressBar()
pbar.register()

result = A.compute()

我正在使用一台装有16GB内存和4个核心的Windows计算机。我使用进度条来评估合并过程的进度。昨晚我把它留了一整夜,今天早上重启后,目前已经过去了大约半个小时,但进展仍然是0%。

谢谢您的帮助,我很感激。

更新

我在我的8GB内存的Mac上尝试了一下,效果还不错。我相信我安装了Anaconda自带的Dask分发程序。无论如何,我没有做任何不同的操作。

我分享了我按照上述编码所花费的时间和结果(21分钟):

In [26]: C = result1.compute()
[########################################] | 100% Completed | 21min 13.4s
[########################################] | 100% Completed | 21min 13.5s
[########################################] | 100% Completed | 21min 13.6s
[########################################] | 100% Completed | 21min 13.6s

更新 2

我在我的Windows电脑上升级到Dask的最新版本,它运行良好。


1
只是一点小建议,使用df而不是df2在布尔索引中是否是一个错误?A = df2[df ['some column'] == 'somevalue'] - Khris
哦,抱歉,我刚看到你的意思。是的,那是一个错误。我已经修复了。谢谢! - dleal
4
@dleal,既然这个问题已经解决了,你能否将解决方案作为答案提供并接受它(即使是你自己问的问题也应该鼓励这样做),以便将其从未回答的列表中删除?谢谢! - Garrett
1
补充@Garrett的评论:请具体说明您使用的Dask版本以及您刚刚更新到了哪个版本。 - vmg
你也可以使用joblib,在多个核心上运行该过程。总体思路是将大型数据集分割为一系列较小的数组(即[large_df_part1,...,large_df_partN]),然后使用joblib将它们分配给不同的处理器。 - Quantum_Something
使用BigQuery处理数据非常方便,它提供了丰富的资源和SQL查询功能,让数据处理变得十分轻松。 - Maryam
3个回答

1
你可以遍历唯一相等的值并使用循环分配其他列:
unioun_set = list(set(small_df['common_column']) & set(large_df['common_column']))
for el in union_set:
    for column in small_df.columns:
        if column not in large_df.columns:
            large_df.loc[large_df['common_column'] == el,column] = small_df.loc[small_df['common_column'] ==  el,column]



-1
在处理大数据时,分区数据非常重要,同时具有足够的集群和内存大小是必需的。
您可以尝试使用 Spark
DASK 是一个纯 Python 框架,它能更多地实现相同的功能,即允许您在本地或集群上运行相同的 Pandas 或 NumPy 代码。而 Apache Spark 则带来了一个新的 API 和执行模型的学习曲线,但具有 Python 包装器。
您可以尝试将数据分区并存储到 parquet 文件中。

这并没有以有意义的方式回答问题。 - rrpelgrim

-1
你可以使用Dask将一个大的DataFrame与一个较小的pandas DataFrame连接在一起。
下面的代码创建了一个具有多个分区的Dask DataFrame,并执行了一个左连接操作,连接了一个小的pandas DataFrame:
import dask.dataframe as dd
import pandas as pd

# create sample 'large' pandas dataframe
df_large = pd.DataFrame(
    {
        "Name": ["Azza", "Brandon", "Cedric", "Devonte", "Eli", "Fabio"], 
        "Age": [29, 30, 21, 57, 32, 19]
    }
)

# create multi-partition dask dataframe from pandas
large = dd.from_pandas(df_large, npartitions=2)

# create sample small pandas dataframe
small = pd.DataFrame(
    {
        "Name": ["Azza", "Cedric", "Fabio"], 
        "City": ["Beirut", "Dublin", "Rosario"]
    }
)

# merge dask dataframe to pandas dataframe
join = ddf.merge(df2, how="left", on=["Name"])

# inspect results
join.compute()

我写了一篇关于合并Dask DataFrames的博客文章在这里,可能对你有所帮助。特别是有关设置索引和排序与未排序连接的注意事项,可能有助于加快计算速度。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接