Spark:将大型MySQL表读入DataFrame失败

7
我想提前说明的是,以下几个相关问题,比如下面这些,都不能解决我的问题:

这个问题接近,但堆栈跟踪不同,而且无论如何都没有解决。所以请放心,我在经过数天(失败的)解决方案搜索后才发布了这个问题。


我正在尝试编写一个任务,将每天的数据从MySQL表移动到存储为Parquet/ORC文件在Amazon S3上的Hive表中。其中一些表相当大:约300M条记录,大小为200GB+(由phpMyAdmin报告)。
目前,我们使用sqoop进行此操作,但出于以下原因,我们想转移到Spark:
- 利用其DataFrame API功能(未来,我们将在移动数据时执行转换) - 我们已经在组织其他地方编写了一个规模可观的Scala框架,用于Spark作业
我已经能够在小的MySQL表上成功实现,没有遇到任何问题。但是,如果一次尝试检索超过大约1.5-2M条记录,读取数据从MySQL到DataFrame的Spark作业将会失败。下面是相关部分的堆栈跟踪,您可以在此处找到完整的堆栈跟踪。
...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
    at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:>                                                       (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...

** 在移动一个包含186M条记录的148 GB表格失败后,获得了这个堆栈跟踪

从(完整)堆栈跟踪可以看出,Spark读取作业开始因None.get错误而false warnings陷入困境,随后是SQLException: Incorrect key for file..(与MySQLtmp table becoming full有关)。


现在显然这不可能是一个 MySQL 的问题,因为如果是的话 sqoop 也应该会失败。就 Spark 而言,我通过设置 numPartitions = 32(我们使用 sqoop 的并行度为 40)来并行化读操作。从我对 Spark 和 BigData 的有限了解来看,148 GB 对于 Spark 来说不应该是令人不堪重负的。此外,由于 MySQL、Spark(EMR)和 S3 都位于同一区域(AWS AP-SouthEast),因此延迟不应该成为瓶颈。

我的问题是:

  1. Spark是否适合此工具?
  2. SparkJdbc driver是否有问题导致了这个问题?
  3. 如果以上问题的答案是
    • 是:我该如何克服它?(替代驱动程序或其他解决方法)?
    • 否:可能的原因是什么?

框架配置:

  • Hadoop分布式版本:Amazon 2.8.3
  • Spark 2.2.1
  • Hive 2.3.2
  • Scala 2.11.11

EMR配置:

  • EMR 5.12.0
  • 1 主节点: r3.xlarge [8 vCore, 30.5 GiB 内存, 80 SSD GB 存储 EBS 存储:32 GiB]
  • 1 任务节点: r3.xlarge [8 vCore, 30.5 GiB 内存, 80 SSD GB 存储 EBS 存储:none]
  • 1 核心节点: r3.xlarge [8 vCore, 30.5 GiB 内存, 80 SSD GB 存储 EBS 存储:32 GiB]

** 这些是开发集群的配置;生产集群将会配备更好的设备。


2
你好,有几个问题1)为什么不使用sqoop将数据保存到HDFS中,然后使用Spark读取该文件,最后将其插入到Hive中? 2)为这个Spark进程分配了哪些资源? - Felix
1
我同意Felix的观点。Sqoop是一种专门用于将关系型数据库数据移动到HDFS的工具。我自己在使用Spark JDBC时也遇到了一些问题。或者,如果您真的坚持要通过Scala应用程序完成此操作,您还可以使用Sqoop Java API。 - philantrovert
在所有情况下,由于您正在使用EMR,而不是分两次进行操作。使用Sqoop将数据转储到parquet中,无论您是否需要S3都可以,然后使用Spark拉取数据并执行转换。 - eliasah
@Viv 请查看此评论。您确定在DataFrameReader.jdbc(..)方法table参数中没有传递类似于(SELECT col_a, col_b, col_c FROM my_db.my_table) AS ql的SQL查询吗? - y2k-shubham
记录一下,这个看起来非常相关。 - y2k-shubham
显示剩余9条评论
1个回答

2

Spark JDBC API似乎会分叉以将MySQL表中的所有数据加载到内存中,因此当您尝试加载大表时,应该使用Spark API先克隆数据到HDFS(应该使用JSON保持模式结构),像这样:

spark.read.jdbc(jdbcUrl, tableName, prop)
       .write()
       .json("/fileName.json");

然后你可以正常地在HDFS上工作。
spark.read().json("/fileName.json")
       .createOrReplaceTempView(tableName);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接