Spark:read.jdbc(..numPartitions..)和repartition(..numPartitions..)中的numPartitions之间的区别

11

我对以下方法中numPartitions参数的行为感到困惑:

  1. DataFrameReader.jdbc
  2. Dataset.repartition

官方文档中关于DataFrameReader.jdbcnumPartitions参数有以下说明:

numPartitions:分区数。与 lowerBound(包含)和 upperBound(不包含)一起,形成用于平均拆分 columnName 的 WHERE 子句表达式的分区步幅。

Dataset.repartition官方文档则说明:

返回一个具有确切 numPartitions 个分区的新数据集。


我的现有理解:

  1. DataFrameReader.jdbc 方法中的 numPartition 参数控制从数据库读取数据的并行度。
  2. Dataset.repartition 中的 numPartition 参数控制当将此 DataFrame 写入磁盘时生成的输出文件数量。

我的问题:

  1. 如果我通过 DataFrameReader.jdbc 读取 DataFrame,然后将其写入磁盘(不调用 repartition 方法),那么输出文件中是否仍会有与在对其调用 repartition 后写出的 DataFrame 相同数量的文件?
  2. 如果上述问题的答案是:
    • 是:那么在使用 DataFrameReader.jdbc 方法(带有 numPartitions 参数)读取的 DataFrame 上调用 repartition 方法是否多余?
    • 否:请纠正我理解上的错误。此外,在这种情况下,DataFrameReader.jdbc 方法的 numPartitions 参数不应该被称为类似于“并行度”的东西吗?
1个回答

17

简短回答:在这两种方法中,numPartitions参数的行为几乎没有区别


read.jdbc(..numPartitions..)

这里,numPartitions 参数控制着:
  1. 建立到 MySQL(或其他 RDBMS)的并行连接数,用于将数据读入 DataFrame
  2. 在所有后续操作中对读取的 DataFrame 进行并行度设置,包括写入磁盘,直到在其上调用repartition 方法。

repartition(..numPartitions..)

这里的numPartitions参数控制了DataFrame在执行任何操作(包括写入磁盘)时所表现出的并行度


所以基本上,通过使用spark.read.jdbc(..numPartitions..)方法从MySQL表读取得到的DataFrame在执行操作时表现出与未使用并行性读取并随后调用repartition(..numPartitions..)方法(显然使用相同的numPartitions值)后的行为相同(表现出相同的并行度)。

回答确切的问题:

如果我通过DataFrameReader.jdbc读取DataFrame,然后将其写入磁盘(不调用repartition方法),那么输出文件中是否仍会有与在对DataFrame进行repartition后写出到磁盘时一样多的文件?

是的。

假设已经通过提供适当的参数(columnNamelowerBoundupperBoundnumPartitions)将读取任务并行化,则对生成的DataFrame 包括写操作将并行执行。引用official docs中的说明如下:

numPartitions:表读取和写入并行性可使用的最大分区数。这还确定了最大并发JDBC连接数。如果要写入的分区数超过此限制,则通过调用coalesce(numPartitions)将其减少到此限制之前进行写入。


是的,对于使用DataFrameReader.jdbc方法(带有numPartitions参数)读取的DataFrame调用repartition方法是否多余?除非您调用其他变体的repartition方法(那些带有columnExprs参数的方法),否则在这样的DataFrame上调用repartition(具有相同的numPartitions参数)是多余的。但是,我不确定在已经并行化的DataFrame上强制使用相同的并行度是否也会不必要地在执行器之间重新分配数据。一旦我找到答案,就会更新答案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接