使用Spark通过s3a将parquet文件写入S3非常缓慢。

22

我正在尝试使用Spark 1.6.1将一个parquet文件写入到Amazon S3。我生成的小型parquet文件一旦写入就是~2GB,因此数据量不是很大。我正在尝试将Spark作为我可以使用的平台证明。

基本上,我正在使用dataframes设置star schema,然后将这些表写入parquet中。数据来自供应商提供的csv文件,我使用Spark作为ETL平台。我目前在ec2(r3.2xlarge)上有一个由3个节点组成的集群,因此执行程序的内存总共有120GB,并且总共有16个核心。

输入文件总共约为22GB,现在我正在提取其中的大约2GB数据。当我开始加载完整数据集时,最终数据集将达到数千亿字节。

以下是我的Spark/Scala pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

统计465884512行需要大约2分钟。将结果写入parquet格式需要38分钟

我知道coalesce会将数据洗牌到驱动器上进行写操作...但是它所花费的时间让我认为我做错了什么。如果不用coalesce,这个过程还需要15分钟,而且会产生大量小的parquet文件。我想要每天只有一个大文件存放所有数据。我已经编写了根据字段值进行分区的代码,但速度同样很慢。我也尝试将其输出为csv,但需要约1小时。

此外,在提交作业时,我没有设置运行时属性。我的控制台统计信息如下:

  • 存活的工作节点:2
  • 正在使用的内核:16个总共、16个已使用
  • 正在使用的内存:117.5 GB总共、107.5 GB已使用
  • 应用程序:1个正在运行、5个已完成
  • 驱动器:0个正在运行、0个已完成
  • 状态:存活

2
一个coalesce操作并不会在驱动程序中进行洗牌,而是在执行器之间进行洗牌,但这与您所看到的问题无关。您正在使用EMR吗?如果是,请使用s3://而不是s3a://。无论如何,在Spark 1.6上,您应该像@David所说的那样使用Direct OutputCommitter。另一个可能的改进是将parquet.enable.summary-metadata设置为false。 - Tal Joffe
在S3前使用Alluxio会加速吗? - James Moore
4个回答

19

Spark默认设置在I/O操作期间会产生大量的(可能)不必要的开销,尤其是在写入S3时。 这篇文章更详细地讨论了这个问题,但有两个设置您需要考虑更改。

  • 使用DirectParquetOutputCommitter。默认情况下,Spark会将所有数据保存到临时文件夹中,然后移动这些文件。使用DirectParquetOutputCommitter将直接写入S3输出路径,节省时间

    • 在Spark 2.0+中不再可用
      • 如Jira票证所述,当前的解决方案是:
        1. 将您的代码切换到使用s3a和Hadoop 2.7.2+;它在各个方面都更好,在Hadoop 2.8中变得更好,并成为s3guard的基础
        2. 使用Hadoop FileOutputCommitter,并将mapreduce.fileoutputcommitter.algorithm.version设置为2

    -自Spark 1.5起,默认关闭模式合并 Spark 1.5 关闭模式合并。如果启用了模式合并,则驱动节点将扫描所有文件以确保一致的模式。这特别昂贵,因为它不是分布式操作。通过执行以下操作确保关闭此功能:

    val file = sqx.read.option("mergeSchema", "false").parquet(path)


3
自 Spark 2.0 起,DirectParquetOutputCommitter 已不再可用。请参考 SPARK-10063 查看新的解决方案。 - Tal Joffe
@TalJoffe 你试过他们的解决方案吗?如果是,它起作用了吗?你能说一下怎么做的吗? - David
我尝试过了,效果非常好。我对一个30克的文件夹进行了小型测试,性能基本相同。 - Tal Joffe
2
如果性能基本相同,那么这是否意味着它并不是一个有效的解决方案? - zzztimbo
1
@zzztimbo 我理解他的评论是指出的解决方法和已弃用的 DirectParquetOutputCommitter 一样表现出色(因此比写parquet文件的开箱即用方式更好)。但是,我还没有尝试过。 - David
2
自Spark 1.5.0版本开始,默认情况下合并模式也是false。http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging - Kamil Sindi

4
直接输出committer已经从spark代码库中删除;您需要编写自己的代码或者恢复被删除的代码到您自己的JAR文件中。如果这样做,请在您的工作中关闭推测,并知道其他故障也可能引起问题,其中问题是“无效数据”。
更令人高兴的是,Hadoop 2.8将为从S3读取优化的二进制格式(ORC,Parquet)添加一些S3A加速功能。有关详细信息,请参见HADOOP-11694。一些人正在努力使用Amazon Dynamo作为一致的元数据存储,这应该能够在工作结束时进行强大的O(1)提交。

2
一种加速Spark写入S3的方法是使用EMRFS S3-optimized Committer。然而,如果您使用s3a,则无法使用此提交程序:

When the EMRFS S3-optimized Committer is Not Used

The committer is not used under the following circumstances:

When writing to HDFS

-> When using the S3A file system

When using an output format other than Parquet, such as ORC or text

When using MapReduce or Spark's RDD API
我已经在AWS EMR 5.26上测试了这个差异,并且使用s3://比s3a://快15%-30%(但仍然很慢)。
我成功实现此类复制/写入的最快方法是将Parquet写入本地HDFS,然后使用s3distcp将其复制到S3;在一个特定的场景中(几百个小文件),这比直接将DataFrame写入Parquet到S3快5倍。

1
赞成先写入HDFS,然后将这些文件移动到s3的想法(尽管我使用gnu parallel + aws cli命令代替s3distcp)。当然,这取决于您的数据,这并不是万能解决方案。 - James Moore

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接