将大型Pyspark数据框创建字典时显示OutOfMemoryError:Java堆空间。

3

我看过并尝试了许多关于此问题的现有StackOverflow帖子,但都没有起作用。我猜我的JAVA堆空间对于我的大数据集来说不够大,我的数据集包含650万行。我的Linux实例具有64GB RAM和4个核心。根据这个建议,我需要修复我的代码,但我认为从pyspark dataframe创建字典不应该很昂贵。请告诉我是否还有其他计算方法。

我只想从我的pyspark dataframe中创建一个Python字典,这是我的pyspark dataframe的内容,

property_sql_df.show()显示,

+--------------+------------+--------------------+--------------------+
|            id|country_code|       name|          hash_of_cc_pn_li|
+--------------+------------+--------------------+--------------------+
|  BOND-9129450|          US|Scotron Home w/Ga...|90cb0946cf4139e12...|
|  BOND-1742850|          US|Sited in the Mead...|d5c301f00e9966483...|
|  BOND-3211356|          US|NEW LISTING - Com...|811fa26e240d726ec...|
|  BOND-7630290|          US|EC277- 9 Bedroom ...|d5c301f00e9966483...|
|  BOND-7175508|          US|East Hampton Retr...|90cb0946cf4139e12...|
+--------------+------------+--------------------+--------------------+

我想要的是用hash_of_cc_pn_li作为,id作为列表值来创建一个字典。

期望输出

{
  "90cb0946cf4139e12": ["BOND-9129450", "BOND-7175508"]
  "d5c301f00e9966483": ["BOND-1742850","BOND-7630290"]
}

迄今为止我尝试过的方法:

方法1:导致java.lang.OutOfMemoryError:Java堆空间不足

%%time
duplicate_property_list = {}
for ind in property_sql_df.collect(): 
     hashed_value = ind.hash_of_cc_pn_li
     property_id = ind.id
     if hashed_value in duplicate_property_list:
         duplicate_property_list[hashed_value].append(property_id) 
     else:
         duplicate_property_list[hashed_value] = [property_id] 

方法2:由于缺少pyspark的本地OFFSET而无法工作。

%%time
i = 0
limit = 1000000
for offset in range(0, total_record,limit):
    i = i + 1
    if i != 1:
        offset = offset + 1
        
    duplicate_property_list = {}
    duplicate_properties = {}
    
    # Preparing dataframe
    url = '''select id, hash_of_cc_pn_li from properties_df LIMIT {} OFFSET {}'''.format(limit,offset)  
    properties_sql_df = spark.sql(url)
    
    # Grouping dataset
    rows = properties_sql_df.groupBy("hash_of_cc_pn_li").agg(F.collect_set("id").alias("ids")).collect()
    duplicate_property_list = { row.hash_of_cc_pn_li: row.ids for row in rows }
    
    # Filter a dictionary to keep elements only where duplicate cound
    duplicate_properties = filterTheDict(duplicate_property_list, lambda elem : len(elem[1]) >=2)
    
    # Writing to file
    with open('duplicate_detected/duplicate_property_list_all_'+str(i)+'.json', 'w') as fp:
        json.dump(duplicate_property_list, fp)

现在在控制台上看到的信息:

java.lang.OutOfMemoryError: Java heap space

并在Jupyter笔记本输出中显示此错误。

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33097)

这是我在这里提出的后续问题: 从Pyspark数据框创建字典,显示OutOfMemoryError:Java堆空间


这个解决方案不起作用吗..? - anky
2
这是一个 XY 问题。你似乎相信你的设计(在驱动程序上生成一个巨大的字典来收集数据)应该可以工作。这只是解决方案的中间步骤,我们对此没有任何想法(而且你的解决方案显然不可扩展)。你需要添加更多信息:1. 你打算用字典做什么(不能在 worker 上完成的任务)2. 你的 Spark 内存设置是什么(以及在 OOM 发生之前实际使用了多少 64GB 的内存)3. 你的 65m 行需要多少内存4. 在 property_sql_df 之前/之后的任何转换。 - ernest_k
@ anky 没有先生,我尝试过了,但没有成功,还是内存错误。 - A l w a y s S u n n y
1个回答

1
为什么不将尽可能多的数据和处理留在执行器中,而不是收集到驱动程序中?如果我理解正确,您可以使用 pyspark 转换和聚合,直接保存为 JSON,从而利用执行器,然后将该 JSON 文件(可能分区)作为字典重新加载回 Python。诚然,这会引入 IO 开销,但这应该可以帮助您避免 OOM 堆空间错误。步骤如下:
import pyspark.sql.functions as f


spark = SparkSession.builder.getOrCreate()
data = [
    ("BOND-9129450", "90cb"),
    ("BOND-1742850", "d5c3"),
    ("BOND-3211356", "811f"),
    ("BOND-7630290", "d5c3"),
    ("BOND-7175508", "90cb"),
]
df = spark.createDataFrame(data, ["id", "hash_of_cc_pn_li"])

df.groupBy(
    f.col("hash_of_cc_pn_li"),
).agg(
    f.collect_set("id").alias("id")  # use f.collect_list() here if you're not interested in deduplication of BOND-XXXXX values
).write.json("./test.json")

检查输出路径:

ls -l ./test.json

-rw-r--r-- 1 jovyan users  0 Jul 27 08:29 part-00000-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 50 Jul 27 08:29 part-00039-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00043-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users 65 Jul 27 08:29 part-00159-1fb900a1-c624-4379-a652-8e5b9dee8651-c000.json
-rw-r--r-- 1 jovyan users  0 Jul 27 08:29 _SUCCESS
_SUCCESS

将其翻译为中文:

将其作为Python dict导入:

import json
from glob import glob

data = []
for file_name in glob('./test.json/*.json'):
    with open(file_name) as f:
        try:
            data.append(json.load(f))
        except json.JSONDecodeError:  # there is definitely a better way - this is here because some partitions might be empty
            pass

最后

{item['hash_of_cc_pn_li']:item['id'] for item in data}

{'d5c3': ['BOND-7630290', 'BOND-1742850'],
 '811f': ['BOND-3211356'],
 '90cb': ['BOND-9129450', 'BOND-7175508']}

我希望这可以帮到你!感谢你提出这个好问题!

终于成功了,只需要修改 将加载转换为Python字典的部分:,非常感谢您。 - A l w a y s S u n n y

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接