在pyspark中,为什么使用`limit`后跟`repartition`会创建完全相等的分区大小?

5
根据pyspark文档repartition应该使用哈希分区,这会导致分区大小略有不同。然而,我发现在limit之前使用它将产生完全相等的分区大小。可以通过在pyspark shell中运行以下命令来展示:
df = spark.createDataFrame([range(5)] * 100)

def count_part_size(part_iter):
    yield len(list(part_iter))

print(df.repartition(20).rdd.mapPartitions(count_part_size).collect())
# [4, 4, 4, 5, 4, 4, 5, 4, 5, 6, 6, 6, 7, 5, 5, 5, 5, 6, 5, 5]

print(df.limit(100).repartition(20).rdd.mapPartitions(count_part_size).collect())
# [5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]

如果repartition使用哈希分区器,为什么会在这种情况下产生完全相等的分区大小?如果它没有使用哈希分区器,那么它使用的是什么类型的分区器?
顺便说一下,我正在使用Python版本2.7.15和Spark版本2.0.2。
1个回答

4
这里有四个因素:
  • 如果没有提供分区表达式,repartition 不使用 HashPartitioning,或者更准确地说,它不会直接使用它。相反,它使用 RoundRobinPartitioning(您可能已经猜到)

    从一个随机分区开始,均匀地分配元素到输出分区中。

    在内部,它在每个分区上生成一系列scala.Int从一个随机点开始。只有这些值通过HashPartitioner

  • 它的工作方式是因为Int hashCode 只是身份识别-换句话说

    ∀x∈Int x = hashCode(x)

    (这是BTW与 Scala Int 范围内的 CPython hash 的相同行为- -2147483648 到 2147483647. 这些哈希值根本不是设计用于加密安全) 因此,将HashPartitioner 应用于一系列Int 值会导致实际的 Round Robin 分配。

    因此,在这种情况下,HashPartitioner 仅作为模运算符工作。

  • 在重分区之前对所有值进行了LIMIT,因此所有值都被混洗到一个节点中。因此,只有一个Int 值序列被使用。

  • 分区数是数据集大小的除数。由于数据可以均匀地分布在各个分区中,因此数据可以均匀地分布在各个分区中。

总体而言,这是预期行为(每个分区应该均匀分布在输出分区中)、管道属性(只有一个输入分区)和数据(数据集可以均匀分布)的组合。


1
简明扼要的解释。 - thebluephantom
太棒了,感谢您详细的回答。我有一个后续问题要问您:是否有一种方法可以使这种分区变得确定性,以便无论 Spark 应用程序在何处运行,它都会将相同的行发送到相同的分区? - Isaac
不,首先进程中有一个不可配置的随机初始化。其次,它取决于值的顺序,这本身通常是不确定的。如果您想要确定性分区成员资格,则应提供确定性分区表达式。 - 10465355

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接