从S3读取CSV文件到Spark dataframe的速度是否应该如此缓慢?

13

我正在开发一个应用程序,需要从S3加载数据集。功能正常,但性能出乎意料地慢。

数据集采用CSV格式。每个文件大约有700万条记录(行),每个文件大小为600-700MB。

val spark = SparkSession
       .builder()
       .appName("MyApp")
       .getOrCreate()

val df = spark
     .read
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv(inFileName:_*)
    // inFileName is a list that current contains 2 file names
    // eg.  s3://mybucket/myfile1.csv

val r = df.rdd.flatMap{ row =>
      /*
       * Discard poorly formated input records 
       */
      try {
        totalRecords.add(1)

        // this extracts several columns from the dataset
        // each tuple of indexColProc specifies the index of the column to
        // select from the input row, and a function to convert
        // the value to an Int
        val coords = indexColProc.map{ case (idx, func) => func( row.get(idx).toString ) }

        List( (coords(0), coords) )
      }
      catch {
        case e: Exception => {    
          badRecords.add(1)
          List()
        }
      }
    }

println("Done, row count " + r.count )

我在由 5 台 m3.xlarge 机器组成的 AWS 集群上运行了此应用程序。将 maximizeResourceAllocation 参数设置为 true,并且这是集群上唯一运行的应用程序。

我分别使用指向 S3 上文件的 'inFileName' 和指向 Hadoop 文件系统中文件的本地副本的 'inFileName' 两次运行了该应用程序。

当我查看 Spark 历史服务器并深入到对应于最终 r.count 操作的作业时,我发现访问 S3 上的文件需要 2.5 分钟,而访问本地 HDFS 上的文件仅需 18 秒。当我在较小的集群或 master=local 配置下运行相同实验时,结果类似。

当我使用

aws s3 cp <file>

传输一个600-700MB的文件只需要6.5秒钟。因此,似乎实例机器的原始I/O并没有对减速做出太大贡献。

访问s3时是否存在这种缓慢的表现预期?如果不是,能否有人指出我错在哪里。 如果是预期的,是否有其他方法可以提高性能?还是我需要开发一些东西,在应用程序运行之前将文件从s3复制到hdfs?


在运行 flatMap/map 之前,可以尝试使用 df.cache() - maxymoo
我确实尝试过那个,但它要么没有效果,要么导致管道挂起(当我尝试那个实验时,我在另一台机器实例上)。 - Tim Ryan
3个回答

12

经过进一步的调查,我发现使用S3 native会有很大的区别。我只需将URI前缀更改为 s3n://, 那么该作业的性能就从2.5分钟降至21秒。因此,访问s3与hdfs相比只有3秒的惩罚,这是相当合理的。

当搜索此主题时,有许多帖子提到s3n具有5GB的最大文件大小限制。然而,我遇到了这篇文章,它说最大文件大小限制在Hadoop 2.4.0中增加到了5TB。

"不再推荐使用S3块文件系统。"


1
我们几个月前也遇到了完全相同的问题,但我们的数据达到了1TB,所以问题更加明显。
我们深入研究后得出以下结论: 由于我们有5个实例,每个实例有30个执行器,每次调度阶段时(任务要做的第一件事是从S3获取数据),因此这些任务将在网络带宽上受到瓶颈限制,然后它们都会移动到计算部分的任务并可能同时争夺CPU。
所以基本上因为所有任务都在同时做同样的事情,它们总是在争夺相同的资源。
我们发现只允许在任何时刻执行k个任务将使它们快速完成下载并转移到计算部分,然后下一组k个任务可以开始下载。这样,现在k(而不是所有)任务正在获得完整的带宽,并且某些任务同时在CPU或I/O上做一些有用的事情,而无需在某些常见资源上等待彼此。
希望这可以帮助您。

听起来值得一试。你能提供一些指导,如何实现这种任务执行的细粒度控制吗?在我的情况下,只有8个执行器,每个执行器大约有4个核心。目前我正在m3.xlarge上运行实验,但如果我能克服这个I/O瓶颈,可能会考虑移动到一个具有更多物理核心的实例。 - Tim Ryan
我没有将代码公开,但我们基本上所做的是 - 在每个任务进入网络密集调用之前,我们会调用一个中央服务来跟踪同一块内有多少其他任务,如果小于某个k,则允许它继续,否则要求此任务进行繁忙旋转一秒钟并重试。然后当任务完成网络调用时,它将更新中央服务。诚然不是最优雅的解决方案,但我们能够将作业时间从90分钟缩短到不到50分钟。 - Sachin Tyagi
经过更深入的挖掘,我发现使用S3本地化会有很大的区别。我只需将URI前缀更改为s3n://,针对该作业的性能从2.5分钟降至21秒。因此,访问s3与hdfs相比只有3秒的惩罚,这是相当合理的。不确定是否适用于您的问题,因为我迄今看到的文档表明,在使用S3本地化时存在5GB的文件大小限制。 - Tim Ryan
我刚刚发现了这篇文章,它说在Hadoop 2.4.0中,文件大小限制已经增加到5TB。 - Tim Ryan

0
你尝试过使用spark-csv包吗?它有很多针对读取csv的优化,而且你可以使用mode=MALFORMED来删除你想要过滤的错误行。你可以像这样直接从s3中读取:
csv_rdf<- read.df(sqlContext,"s3n://xxxxx:xxxxx@foldername/file1.csv",source="com.databricks.spark.csv")

更多细节可以在这里找到 https://github.com/databricks/spark-csv


我以前尝试过spark-csv,但性能没有任何改善。但是感谢您提供的mode=MALFORMED指针。我不知道这种模式,它可以帮助简化我的代码。谢谢。 - Tim Ryan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接