Dask.dataframe:合并和分组时内存不足

7

我对Dask不太熟悉,遇到了一些问题。

我的机器配置是4GB内存和2个核心,我要分析两个csv文件(key.csv:大约300MB,有200万行;sig.csv:大约600MB,有1200万行)。使用pandas时,数据无法全部装入内存,因此我转而使用Dask.dataframe。我期望Dask会将数据分成小块并逐步处理(速度慢一些也没关系,只要能正常工作即可),但是不知怎么的,Dask仍然占用了全部内存。

以下是我的代码:

    key=dd.read_csv("key.csv")
    sig=dd.read_csv("sig.csv")
  
    merge=dd.merge(key, sig, left_on=["tag","name"],
        right_on=["key_tag","query_name"], how="inner")
    merge.to_csv("test2903_*.csv") 
    # store results into  a hard disk since it can't be fit in memory

我有没有犯错?任何帮助都将不胜感激。


4
你可以尝试在dd.read_csv中减小块大小(chunksize) 。 - MRocklin
1
你可以尝试使用Dask Distributed,并带上"--memory-limit=auto" option选项来运行它。 - Vlad Frolov
1
@MRocklin:谢谢,它可以工作。 - Tho Le Phuoc
1个回答

0

大型CSV文件通常不适合于像Dask这样的分布式计算引擎。在此示例中,CSV文件的大小为600MB和300MB,不算太大。如注释中所指定的那样,您可以在读取CSV文件时设置块大小以确保将CSV文件读入到正确数量的Dask DataFrames中。

当您可以在运行连接之前广播小型DataFrame时,分布式计算联接将始终运行得更快。您的计算机有4GB的RAM,而小型DataFrame仅为300MB,因此足够小可进行广播。Dask会自动广播Pandas DataFrames。您可以使用compute()将Dask DataFrame转换为Pandas DataFrame。

key是您示例中的小DataFrame。在广播之前对小DataFrame进行列修剪并使其更小更好。

key=dd.read_csv("key.csv")
sig=dd.read_csv("sig.csv", blocksize="100 MiB")

key_pdf = key.compute()
  
merge=dd.merge(key_pdf, sig, left_on=["tag","name"],
        right_on=["key_tag","query_name"], how="inner")
merge.to_csv("test2903_*.csv")

这是一个 MVCE 的示例代码。
import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
    {
        "id": [1, 2, 3, 4],
        "cities": ["Medellín", "Rio", "Bogotá", "Buenos Aires"],
    }
)
large_ddf = dd.from_pandas(df, npartitions=2)

small_df = pd.DataFrame(
    {
        "id": [1, 2, 3, 4],
        "population": [2.6, 6.7, 7.2, 15.2],
    }
)

merged_ddf = dd.merge(
    large_ddf,
    small_df,
    left_on=["id"],
    right_on=["id"],
    how="inner",
)

print(merged_ddf.compute())

   id        cities  population
0   1      Medellín         2.6
1   2           Rio         6.7
0   3        Bogotá         7.2
1   4  Buenos Aires        15.2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接