利用Pyspark dataframe创建字典时出现了OutOfMemoryError:Java heap space

3

我看过并尝试了许多关于这个问题的现有 StackOverflow 帖子,但都不起作用。我猜我的JAVA堆空间对于我的大数据集来说并不像预期的那样大,我的数据集包含6.5M行。我的Linux实例带有64GB RAM和4个核心。根据这个建议,我需要修复我的代码,但我认为从pyspark dataframe创建字典不应该很昂贵。请告诉我是否有其他计算方法。

我只想从我的pyspark dataframe中制作一个Python字典,这是我的pyspark dataframe的内容,

property_sql_df.show() 显示,

+--------------+------------+--------------------+--------------------+
|            id|country_code|       name|          hash_of_cc_pn_li|
+--------------+------------+--------------------+--------------------+
|  BOND-9129450|          US|Scotron Home w/Ga...|90cb0946cf4139e12...|
|  BOND-1742850|          US|Sited in the Mead...|d5c301f00e9966483...|
|  BOND-3211356|          US|NEW LISTING - Com...|811fa26e240d726ec...|
|  BOND-7630290|          US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
|  BOND-7175508|          US|East Hampton Retr...|90cb0946cf4139e12...|
+--------------+------------+--------------------+--------------------+

我想要的是以 hash_of_cc_pn_li 作为,id 作为列表值的字典。 预期输出
{
  "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
  "d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
}

到目前为止我尝试过的:

%%time
duplicate_property_list = {}
for ind in property_sql_df.collect(): 
     hashed_value = ind.hash_of_cc_pn_li
     property_id = ind.id
     if hashed_value in duplicate_property_list:
         duplicate_property_list[hashed_value].append(property_id) 
     else:
         duplicate_property_list[hashed_value] = [property_id] 

现在在控制台上显示的内容:

java.lang.OutOfMemoryError: Java heap space

并且在Jupyter笔记本输出上显示此错误。

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)

不确定这是否有帮助,但作为第一步,您可以尝试将列表集合分发为 df.groupby("hash_of_cc").agg(collect_list("id")).show()。然后再仔细查看并决定是否真的需要将完整的数据框收集到驱动程序中。 - mazaneicha
4个回答

2
从pyspark dataframe中制作字典不应该非常昂贵,这在运行时是正确的,但这很容易占用大量空间。特别是如果您执行property_sql_df.collect(),此时您将整个数据框加载到驱动程序内存中。如果每行有10KB或10K个字符,则在6.5M行处,您将已经达到65GB,并且我们还没有开始制作字典。首先,您可以仅收集所需的列(例如不包括name)。其次,您可以在Spark上游进行聚合,这将节省一些空间,具体取决于每个hash_of_cc_pn_li中有多少个id。
rows = property_sql_df.groupBy("hash_of_cc_pn_li") \
  .agg(collect_set("id").alias("ids")) \
  .collect()

duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }

感谢您详细的回答,先生。 - A l w a y s S u n n y
只是一个快速的问题,当我尝试使用df.toPandas()将我的pyspark数据框转换为pandas时,出现了相同的错误,这种情况下我该怎么办? - A l w a y s S u n n y
1
df.toPandas() 从占用空间的角度来看甚至不如 df.collect()。然而,它们替代方案的选择取决于后续你要做什么。比如说,如果目的只是将这些映射存储为csv文件,那么你可以直接从数据框 (使用 collect_list / collect_set) 并通过 df.coalesce(1).write.option('sep',';').csv(path) 将其保存到文件中。 - Eric Doi
没有运气,同样的问题仍然存在。 - A l w a y s S u n n y
跟进问题:https://dev59.com/_bzpa4cB1Zd3GeqPKWh2 - A l w a y s S u n n y

1

1
这里是如何使用您的数据创建一个示例DataFrame的方法:
data = [
    ("BOND-9129450", "90cb"),
    ("BOND-1742850", "d5c3"),
    ("BOND-3211356", "811f"),
    ("BOND-7630290", "d5c3"),
    ("BOND-7175508", "90cb"),
]
df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])

让我们在Spark DataFrame中聚合数据,以限制在驱动节点上收集的行数。 我们将使用在quinn中定义的two_columns_to_dictionary函数创建字典。

agg_df = df.groupBy("hash_of_cc_pn_li").agg(F.max("hash_of_cc_pn_li").alias("hash"), F.collect_list("id").alias("id"))
res = quinn.two_columns_to_dictionary(agg_df, "hash", "id")
print(res) # => {'811f': ['BOND-3211356'], 'd5c3': ['BOND-1742850', 'BOND-7630290'], '90cb': ['BOND-9129450', 'BOND-7175508']}

这可能适用于相对较小的650万行数据集,但不适用于大型数据集。"我认为从pyspark dataframe创建字典不应该非常昂贵"仅适用于真正微小的DataFrames。从PySpark DataFrame创建字典实际上非常昂贵。
PySpark是一个集群计算框架,受益于将数据分布在集群中的节点上。当您调用collect时,所有数据都会移动到驱动程序节点,工作节点不会起到帮助作用。每当尝试将过多数据移动到驱动程序节点时,您将收到OutOfMemory异常。
最好完全避免使用字典,并找出解决问题的不同方法。好问题。

感谢您详细的回答,先生。 - A l w a y s S u n n y
只是一个简单的问题,当我尝试将我的pyspark dataframe转换为pandas时,出现了相同的错误,请问我该怎么办? - A l w a y s S u n n y
1
@AlwaysSunny - 是的,toPandas也会在驱动节点上收集所有数据,除非数据集很小,否则应该避免使用。 - Powers
好的,我明白您的意思。但是即使只有5000条简单的记录,它也会导致相同的错误。 - A l w a y s S u n n y
跟进问题:https://dev59.com/_bzpa4cB1Zd3GeqPKWh2 - A l w a y s S u n n y
显示剩余2条评论

1

Spark-2.4开始,我们可以使用内置函数groupBy,collect_list,map_from_arrays,to_json来处理此类情况。

示例:

df.show()
#+------------+-----------------+
#|          id| hash_of_cc_pn_li|
#+------------+-----------------+
#|BOND-9129450|90cb0946cf4139e12|
#|BOND-7175508|90cb0946cf4139e12|
#|BOND-1742850|d5c301f00e9966483|
#|BOND-7630290|d5c301f00e9966483|
#+------------+-----------------+
df.groupBy(col("hash_of_cc_pn_li")).\
agg(collect_list(col("id")).alias("id")).\
selectExpr("to_json(map_from_arrays(array(hash_of_cc_pn_li),array(id))) as output").\
show(10,False)
#+-----------------------------------------------------+
#|output                                               |
#+-----------------------------------------------------+
#|{"90cb0946cf4139e12":["BOND-9129450","BOND-7175508"]}|
#|{"d5c301f00e9966483":["BOND-1742850","BOND-7630290"]}|
#+-----------------------------------------------------+

使用collect_list聚合另一个agg以获取one dict
df.groupBy(col("hash_of_cc_pn_li")).\
agg(collect_list(col("id")).alias("id")).\
agg(to_json(map_from_arrays(collect_list(col("hash_of_cc_pn_li")),collect_list(col("id")))).alias("output")).\
show(10,False)
#+---------------------------------------------------------------------------------------------------------+
#|output                                                                                                   |
#+---------------------------------------------------------------------------------------------------------+
#|{"90cb0946cf4139e12":["BOND-9129450","BOND-7175508"],"d5c301f00e9966483":["BOND-1742850","BOND-7630290"]}|
#+---------------------------------------------------------------------------------------------------------+

感谢您详细的回答,先生。 - A l w a y s S u n n y
我正在使用Spark 3.0,因此可以使用collect_list函数。 - A l w a y s S u n n y
只是一个快速的问题,当我尝试使用df.toPandas()将我的pyspark数据框转换为pandas时,出现了相同的错误,这种情况下我该怎么办? - A l w a y s S u n n y
1
@AlwaysSunny,尽量避免转换为pandas dataframe,如果可能,请使用spark内置函数来获得期望的输出。 pandas会收集数据并导致驱动程序失败。 https://dev59.com/HZvga4cB1Zd3GeqPzEP_ - notNull
没有运气,同样的问题仍然出现。 - A l w a y s S u n n y
跟进问题:https://dev59.com/_bzpa4cB1Zd3GeqPKWh2 - A l w a y s S u n n y

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接