从另一个DataFrame进行谓词下推筛选数据框架

4

我该如何将一个基于另一个数据框的筛选器应用到读取的数据框中?基本上,我想避免完全读取第二个数据框,然后执行内部连接。相反,我希望只需提交一个筛选器即可在源处进行筛选。即使我使用了内部连接包装读取,计划也没有显示它被筛选了。我觉得肯定有更好的设置方法。我目前使用Spark 2.x,但我想避免像下面这样收集一个列表:

//  Don't want to do this collect...too slow
  val idFilter = df1.select("id").distinct().map(r => r.getLong(0)).collect.toList
  val df2: DataFrame = spark.read.format("parquet").load("<path>") 
    .filter($"id".isin(idFilter: _*))

您的想法会导致处理速度变慢,因为读取时您必须检查每一行。 - Ramesh Maharjan
没有collect,你应该进行左半连接,参见例如https://stackoverflow.com/a/49658868/1138523 - Raphael Roth
1
但我非常确定它将首先完整读取数据框,然后再执行连接操作。我试图避免这种情况,并将筛选器下推到源头。 - horatio1701d
你这个问题解决了吗?我也遇到了完全相同的问题,使用JOIN会导致对整个表进行扫描,而不是从其他数据集中获取值进行过滤。 - D. Müller
1个回答

0

除非您自己实现了DataSource,否则不能直接使用谓词下推。谓词下推是由Spark数据源提供的机制,每个数据源必须单独实现。

对于基于文件的数据源,已经有一个简单的基于磁盘分区的机制。

考虑以下DataFrame:

val df = Seq(("test", "day1"), ("test2", "day2")).toDF("data", "day")

如果我们以以下方式将该DataFrame保存到磁盘上:
df.write.partitionBy("day").save("/tmp/data")

结果将是以下文件夹结构

tmp -
     |
     | - data - |
                |
                |--day=day1 -|- part1....parquet
                |            |- part2....parquet
                |
                |--day=day2 -|- part1....parquet
                             |- part2....parquet

如果您现在是这样使用此数据源的:

spark.read.load("/tmp/data").filter($"day" = "day1").show()

Spark不会加载day2文件夹的数据,因为没有必要。

这是一种谓词下推的类型,适用于spark支持的每种标准文件格式。

更具体的机制是parquet。Parquet是一种基于列的文件格式,这意味着很容易过滤掉列。如果您有基于parquet的文件,其中包含3列abc,在文件/tmp/myparquet.parquet中,以下查询:

spark.read.parquet("/tmp/myparquet.parquet").select("a").show()

这将导致内部谓词下推,其中Spark仅获取列a的数据,而不读取列bc的数据。

如果有人对实现此特性所建立的机制感兴趣:

/**
 * A BaseRelation that can eliminate unneeded columns and filter using selected
 * predicates before producing an RDD containing all matching tuples as Row objects.
 *
 * The actual filter should be the conjunction of all `filters`,
 * i.e. they should be "and" together.
 *
 * The pushed down filters are currently purely an optimization as they will all be evaluated
 * again.  This means it is safe to use them with methods that produce false positives such
 * as filtering partitions based on a bloom filter.
 *
 * @since 1.3.0
 */
@Stable
trait PrunedFilteredScan {
  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

可以在https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala中找到。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接