我正在尝试使用Spark 1.6.1将一个parquet
文件写入到Amazon S3
。我生成的小型parquet
文件一旦写入就是~2GB
,因此数据量不是很大。我正在尝试将Spark作为我可以使用的平台证明。
基本上,我正在使用dataframes
设置star schema
,然后将这些表写入parquet中。数据来自供应商提供的csv文件,我使用Spark作为ETL
平台。我目前在ec2(r3.2xlarge)
上有一个由3个节点组成的集群,因此执行程序的内存总共有120GB
,并且总共有16个核心。
输入文件总共约为22GB,现在我正在提取其中的大约2GB数据。当我开始加载完整数据集时,最终数据集将达到数千亿字节。
以下是我的Spark/Scala pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
统计465884512行需要大约2分钟。将结果写入parquet格式需要38分钟。
我知道coalesce
会将数据洗牌到驱动器上进行写操作...但是它所花费的时间让我认为我做错了什么。如果不用coalesce
,这个过程还需要15分钟,而且会产生大量小的parquet
文件。我想要每天只有一个大文件存放所有数据。我已经编写了根据字段值进行分区的代码,但速度同样很慢。我也尝试将其输出为csv
,但需要约1小时。
此外,在提交作业时,我没有设置运行时属性。我的控制台统计信息如下:
- 存活的工作节点:2
- 正在使用的内核:16个总共、16个已使用
- 正在使用的内存:117.5 GB总共、107.5 GB已使用
- 应用程序:1个正在运行、5个已完成
- 驱动器:0个正在运行、0个已完成
- 状态:存活