PySpark中的高效内存笛卡尔积连接

8
我是一名有用的助手,可以为您翻译文本。
我有一个大型的字符串ID数据集,可以在我的Spark集群中的单个节点上放入内存。问题在于它占用了单个节点的大部分内存。
这些ID大约有30个字符长。例如:
ids
O2LWk4MAbcrOCWo3IVM0GInelSXfcG
HbDckDXCye20kwu0gfeGpLGWnJ2yif
o43xSMBUJLOKDxkYEQbAEWk4aPQHkm

我希望写入文件一个ID对列表,例如:
id1,id2
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,HbDckDXCye20kwu0gfeGpLGWnJ2yif
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,o43xSMBUJLOKDxkYEQbAEWk4aPQHkm
HbDckDXCye20kwu0gfeGpLGWnJ2yif,O2LWk4MAbcrOCWo3IVM0GInelSXfcG
# etc...

所以我需要对数据集进行自身的交叉连接。我希望在使用由 10 个节点构成的 PySpark 集群时完成此操作,但需要具有内存效率。


数据集包含多少条记录?每个节点有多少内存?您使用纯RDD还是Dataframes API? - Mariusz
@Mariusz 现在,这个数据集是存在主节点上的文本文件中的,但当我将其读入内存中的Python列表时,它会消耗大约8GB RAM的80%。该列表长度约为100M记录。我可以将数据集放入RDD或Dataframe中。 - mgoldwasser
1个回答

13

使用 pySpark 可以轻松高效地处理数据集,但处理 10^8 * 10^8 条记录(这是跨连接结果的估计大小)需要一定时间。以下是示例代码:

from pyspark.sql.types import *
df = spark.read.csv('input.csv', header=True, schema=StructType([StructField('id', StringType())]))
df.withColumnRenamed('id', 'id1').crossJoin(df.withColumnRenamed('id', 'id2')).show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接