PySpark: 在 join 后 count() 的结果不一致

3
我完全不理解以下问题:
当我连接两个数据框并返回行数时,每次尝试都会得到略微不同的计数。以下是详细信息:
我想连接数据框:'df_user_ids' 和 'df_conversions':
df_user_ids.show()
>>>
+--------------------+
|             user_id|
+--------------------+
|AMsySZY-cqcufnXst...|
|AMsySZY1Oo75A6vKU...|
|AMsySZY4nbqZiuEMR...|
|AMsySZY5RSfgj6Xvi...|
|AMsySZY5geAmTx0er...|
|AMsySZY6Gskv_kEAv...|
|AMsySZY6MIOyPWM4U...|
|AMsySZYCEZYS00UB9...| 

df_conversions.show()
>>>
+--------------------+----------------------+---------+
|             user_id|time_activity_observed|converted|
+--------------------+----------------------+---------+
|CAESEAl1YPOZpaWVx...|   2018-03-23 12:15:37|        1|
|CAESEAuvSBzmfc_f3...|   2018-03-23 21:58:25|        1|
|CAESEBXWsSYm4ntvR...|   2018-03-30 12:16:53|        1|
|CAESEC-5uPwWGFdnv...|   2018-03-23 08:52:48|        1|
|CAESEDB3Z-NNvz7zL...|   2018-03-24 21:37:05|        1|
|CAESEDu7S7rGTVlj2...|   2018-04-01 17:00:12|        1|
|CAESEE4s6g1-JlUEt...|   2018-03-23 19:32:23|        1|
|CAESEELlJt0mE2xjn...|   2018-03-24 18:26:15|        1|

两个数据框都有名为“user_id”的关键列, 并且都使用了固定的种子通过“.sampleBy()”创建:

.sampleBy("converted", fractions={0: 0.035, 1: 1}, seed=0)    

在合并数据帧之前,我会将它们保存到磁盘中以持久化:
df_user_ids.persist(StorageLevel.DISK_ONLY)
df_conversions.persist(StorageLevel.DISK_ONLY) 

然后我验证两个数据框的行数是否一致:

df_user_ids.count()
>>> 584309

df_user_ids.count()
>>> 584309

df_conversions.count()
>>> 5830

df_conversions.count()
>>> 5830

请检查两个数据框的键列是否不包含重复值:

df_user_ids.count()
>>> 584309

df_user_ids.select('user_id').distinct().count()
>>> 584309

df_conversions.count()
>>> 5830

df_conversions.select('user_id').distinct().count()
>>> 5830

然后当我将它们连接起来时,行数不一致!

df_user_ids.join(df_conversions, ["user_id"], "left").count()
>>> 584314

df_user_ids.join(df_conversions, ["user_id"], "left").count()
>>> 584317

df_user_ids.join(df_conversions, ["user_id"], "left").count()
>>> 584304

这怎么可能?有时,这个“joined count”比“df_user_ids.count()”还要高,而有时却比它低。我在 AWS EMR 上的 EMR 集群中使用了 Zeppelin 笔记本来运行此代码。
我已经尝试了下面链接中建议的方法:
- “.persist(StorageLevel.DISK_ONLY)”并没有起到作用。 - 我没有使用 monotonically_increasing_id。
参考链接:spark inconsistency when running count command

1
sampleBy不能保证您获得精确的行分数。它使用每个记录被包括的概率等于分数的样本,并且可能因运行而异。 - Steven
你确定要在采样结果上调用persist方法吗? - Arnon Rotem-Gal-Oz
@ArnonRotem-Gal-Oz 是的,它们在采样后被持久化了。 - Johnny M
1个回答

0
通过观察您在DataFrame上执行的一系列操作,我认为问题是由于Join引起的。Join操作会导致洗牌,每个节点都会与其他节点通信,并根据哪个节点具有某个键或一组键(您正在连接的键)来共享数据。在跨执行器共享数据时,如果执行器没有将DataFrame持久化到磁盘上,则它将重新计算DAG,并且无法保证sampleBy返回DataFrame中相同行的相同分数。

那么在sampleBy上设置种子值并持久化结果可能不足以生成一致的结果? - Johnny M
1
我发现如果我不先持久化输出,使用 pyspark.sql.functions.explode 也会导致输出数据框的 count() 不一致。 - panc

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接