将Delta表复制到Databricks后,写入速度显著提高。

5

我正在将一个PySpark数据框合并到Delta表中。输出的Delta表按日期进行分区。以下查询需要30秒才能运行:

query = DeltaTable.forPath(spark, PATH_TO_THE_TABLE).alias(
            "actual"
        ).merge(
            spark_df.alias("sdf"),
            "actual.DATE >= current_date() - INTERVAL 1 DAYS 
             AND (actual.feat1 = sdf.feat1) 
             AND (actual.TIME = sdf.TIME) 
             AND (actual.feat2 = sdf.feat2) "
            ,
        ).whenNotMatchedInsertAll()

将Delta内容复制到另一个位置后,使用NEW_PATH替代PATH_TO_THE_TABLE时,上述查询变得快60倍(即在同一集群上只需0.5秒)。以下是复制Delta的命令:

(spark.read.format("delta").load(PATH_TO_THE_TABLE).write.format(
        "delta"
    )
.mode("overwrite").partitionBy(["DATE"]).save(NEW_PATH))

如何让对第一个增量的查询像对新增量一样快?我了解增量有版本控制系统,我怀疑这是它需要花费大量时间的原因。我尝试过对增量表进行清理(将查询时间降至20秒),但离0.5秒仍有很大距离。

技术栈:

  • Python 3.7;
  • Pyspark 3.0.1;
  • Databricks Runtime 7.3 LTS

使用了哪个Databricks运行时?原始表是如何创建的? - Alex Ott
数据块运行环境 = 7.3 LTS。第一个表格是通过类似于上面的命令创建的: df.write.format( "delta" ) .mode("overwrite").partitionBy(["DATE"]).save(NEW_PATH)) - Noé Achache
1
@NoéAchache你解决了吗?NEW_PATH是在同一类型的磁盘上吗?因为可能磁盘写入速度不同。 - Oliver Angelil
1个回答

0

我看到你比较 actual.DATE >= current_date() ,因为这是你查询中最重要的部分,请尝试按该字段定期运行 ZORDER 排序增量:

OPTIMIZE actual ZORDER BY (actual.DATE)

您也可以尝试完全清空 Delta:

VACUUM actual RETAIN 0 HOURS

要做到这一点,您需要设置spark.databricks.delta.retentionDurationCheck.enabled false

如果您不想使用delta的好处(事务、并发写入、时间旅行历史等),您可以只使用parquet。


感谢您的回答。如帖子中所述,我已经清空了Delta,这确实有所帮助,但仅将查询时间减少了约10秒。关于ZORDER,在分区键上不起作用(我猜它已经被优化了?)。使用Parquet可能确实是一个更好的想法,但我找不到一个允许将pyspark df合并到Parquet的Python库? - Noé Achache

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接