大型CSV文件通常不适合于像Dask这样的分布式计算引擎。在此示例中,CSV文件的大小为600MB和300MB,不算太大。如注释中所指定的那样,您可以在读取CSV文件时设置块大小以确保将CSV文件读入到正确数量的Dask DataFrames中。
当您可以在运行连接之前广播小型DataFrame时,分布式计算联接将始终运行得更快。您的计算机有4GB的RAM,而小型DataFrame仅为300MB,因此足够小可进行广播。Dask会自动广播Pandas DataFrames。您可以使用compute()
将Dask DataFrame转换为Pandas DataFrame。
key
是您示例中的小DataFrame。在广播之前对小DataFrame进行列修剪并使其更小更好。
key=dd.read_csv("key.csv")
sig=dd.read_csv("sig.csv", blocksize="100 MiB")
key_pdf = key.compute()
merge=dd.merge(key_pdf, sig, left_on=["tag","name"],
right_on=["key_tag","query_name"], how="inner")
merge.to_csv("test2903_*.csv")
这是一个 MVCE 的示例代码。
import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame(
{
"id": [1, 2, 3, 4],
"cities": ["Medellín", "Rio", "Bogotá", "Buenos Aires"],
}
)
large_ddf = dd.from_pandas(df, npartitions=2)
small_df = pd.DataFrame(
{
"id": [1, 2, 3, 4],
"population": [2.6, 6.7, 7.2, 15.2],
}
)
merged_ddf = dd.merge(
large_ddf,
small_df,
left_on=["id"],
right_on=["id"],
how="inner",
)
print(merged_ddf.compute())
id cities population
0 1 Medellín 2.6
1 2 Rio 6.7
0 3 Bogotá 7.2
1 4 Buenos Aires 15.2
dd.read_csv
中减小块大小(chunksize) 。 - MRocklin