在Spark中禁用Parquet元数据摘要

9
我有一个Spark作业(适用于1.4.1版本),接收来自Kafka事件流。我想将它们连续保存为Tachyon上的parquet文件。
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

lines.window(Seconds(1), Seconds(1)).foreachRDD { (rdd, time) =>
  if (rdd.count() > 0) {
    val mil = time.floor(Duration(86400000)).milliseconds
    hiveContext.read.json(rdd).toDF().write.mode(SaveMode.Append).parquet(s"tachyon://192.168.1.12:19998/persisted5$mil")
    hiveContext.sql(s"CREATE TABLE IF NOT EXISTS persisted5$mil USING org.apache.spark.sql.parquet OPTIONS ( path 'tachyon://192.168.1.12:19998/persisted5$mil')")
  }
}

然而,随着时间的推移,我发现在每次写入Parquet时,Spark都会遍历每个1秒的Parquet部分,这使得速度越来越慢。
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-db03b24d-6f98-4b5d-bb40-530f35b82633.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-3a7857e2-0435-4ee0-ab2c-6d40224f8842.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-47ff2ac1-da00-4473-b3f7-52640014bc5b.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-61625436-7353-4b1e-bb8d-e8afad3a582e.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-e711aa9a-9bf5-41d5-8523-f5edafa69626.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-4e0cca38-cf75-4771-8965-20a30c863100.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-d1510ed4-2c99-43e2-b3d1-38d3d54e626d.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-022d1918-392d-433f-a7f4-074e46b4460f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-cf71f5d2-ba0e-4729-9aa1-41dad5d1d08f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-ce990b1e-82cc-4feb-a162-ac3ddc275609.gz.parquet, 65536)

我得出结论是这是由于摘要数据的更新,我相信Spark没有使用它们,所以我想禁用它。 Parquet源显示我应该能够将 "parquet.enable.summary-metadata" 设置为false。
现在,我尝试像这样设置它,在创建hiveContext之后立即进行。
hiveContext.sparkContext.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
hiveContext.sparkContext.hadoopConfiguration.setInt("parquet.metadata.read.parallelism", 10) 

但是没有成功,我仍然能看到显示默认并行度为5的日志。
在Spark中使用Parquet禁用汇总数据的正确方法是什么?
2个回答

15

Spark 2.0默认不再保存元数据摘要,详见SPARK-15719.

如果您正在使用托管在S3中的数据,则可能仍会发现Parquet性能受到影响,因为Parquet本身试图扫描所有对象的尾部以检查它们的模式。可以显式禁用此功能。

sparkConf.set("spark.sql.parquet.mergeSchema", "false")

3
自Spark 1.5.0版本开始,默认关闭了模式合并功能,因为在大多数情况下,它是一个相对昂贵的操作且非必需的。 - Mark Rajcok

14

将"parquet.enable.summary-metadata"设置为文本("false" and not false)对我们来说似乎有效。

顺便说一下,Spark确实使用_common_metadata文件(我们手动复制它以进行重复的作业)。


如果我禁用shemaMerging,元数据仍然会被使用吗?在beeline中似乎工作正常。 - Pierre Lacave
在Spark 1.3中没有,而在1.4.1中仍然寻找_common_metadata(可能是一个错误)。 - Arnon Rotem-Gal-Oz
如果元数据缺失,刷新时似乎会回退到读取页脚部分。但是它能够工作(仍然很慢,基本上把问题从插入移到了刷新)。 - Pierre Lacave
在较新的Spark版本中,我们并没有发现这是一个问题,并重新启用了它 - 但是无论如何,当我们没有生成元数据时,我们会从先前的作业中复制基本元数据(相同的输出结构...)。 - Arnon Rotem-Gal-Oz
模式已更改,我们编写了一些处理程序,在读取时应用转换来处理它 - 正如我在先前的评论中提到的,我们没有发现需要在较新的Spark版本中禁用元数据创建。 - Arnon Rotem-Gal-Oz
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接