我正在开发一个应用程序,需要从S3加载数据集。功能正常,但性能出乎意料地慢。
数据集采用CSV格式。每个文件大约有700万条记录(行),每个文件大小为600-700MB。
val spark = SparkSession
.builder()
.appName("MyApp")
.getOrCreate()
val df = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inFileName:_*)
// inFileName is a list that current contains 2 file names
// eg. s3://mybucket/myfile1.csv
val r = df.rdd.flatMap{ row =>
/*
* Discard poorly formated input records
*/
try {
totalRecords.add(1)
// this extracts several columns from the dataset
// each tuple of indexColProc specifies the index of the column to
// select from the input row, and a function to convert
// the value to an Int
val coords = indexColProc.map{ case (idx, func) => func( row.get(idx).toString ) }
List( (coords(0), coords) )
}
catch {
case e: Exception => {
badRecords.add(1)
List()
}
}
}
println("Done, row count " + r.count )
我在由 5 台 m3.xlarge 机器组成的 AWS 集群上运行了此应用程序。将 maximizeResourceAllocation 参数设置为 true,并且这是集群上唯一运行的应用程序。
我分别使用指向 S3 上文件的 'inFileName' 和指向 Hadoop 文件系统中文件的本地副本的 'inFileName' 两次运行了该应用程序。
当我查看 Spark 历史服务器并深入到对应于最终 r.count 操作的作业时,我发现访问 S3 上的文件需要 2.5 分钟,而访问本地 HDFS 上的文件仅需 18 秒。当我在较小的集群或 master=local 配置下运行相同实验时,结果类似。
当我使用
aws s3 cp <file>
传输一个600-700MB的文件只需要6.5秒钟。因此,似乎实例机器的原始I/O并没有对减速做出太大贡献。
访问s3时是否存在这种缓慢的表现预期?如果不是,能否有人指出我错在哪里。 如果是预期的,是否有其他方法可以提高性能?还是我需要开发一些东西,在应用程序运行之前将文件从s3复制到hdfs?
flatMap
/map
之前,可以尝试使用df.cache()
。 - maxymoo