将 MySQL 数据表转换为 Spark 数据集与从 CSV 文件转换相比速度非常慢。

6

我在亚马逊s3上有一个大小为62MB的csv文件(114,000行)。我正在将其转换为Spark数据集,并从中取出前500行。代码如下:

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");

 set.take(500)

整个操作需要20到30秒钟。
现在我正在尝试相同的操作,但不是使用csv,而是使用包含119000行的MySQL表。MySQL服务器位于Amazon EC2上。代码如下:
String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;

SparkSession spark=StartSpark.getSparkSession();

SQLContext sc = spark.sqlContext();

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set = sc
            .read()
            .option("url", url)
            .option("dbtable", this.tableName)
            .option("driver","com.mysql.jdbc.Driver")
            .format("jdbc")
            .load();
set.take(500);

这需要5到10分钟的时间。我正在jvm中运行spark。在两种情况下都使用相同的配置。

我可以使用partitionColumn,numParttition等参数,但我没有任何数字列,并且另一个问题是表的架构对我来说是未知的。

我的问题不是如何减少所需的时间,因为我知道在理想情况下,spark将在集群中运行,但我无法理解为什么上述两种情况存在如此大的时间差异?


这行代码 DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true); 真的必要吗? - Adonis
不,完全没有必要,我只是在测试其他东西,忘记删除了。 - KOUSIK MANDAL
2个回答

10

这个问题在 StackOverflow 上已经被多次讨论:

还有其他外部来源:

所以再次强调,默认情况下 DataFrameReader.jdbc 不会分发数据或读取。它只使用单个线程、单个执行器。

要分发读取操作:

  • 使用 lowerBound / upperBound 范围:

    Properties properties;
    Lower
    
    Dataset<Row> set = sc
        .read()
        .option("partitionColumn", "foo")
        .option("numPartitions", "3")
        .option("lowerBound", 0)
        .option("upperBound", 30)
        .option("url", url)
        .option("dbtable", this.tableName)
        .option("driver","com.mysql.jdbc.Driver")
        .format("jdbc")
        .load();
    
  • predicates

  • Properties properties;
    Dataset<Row> set = sc
        .read()
        .jdbc(
            url, this.tableName,
            {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
            properties
        )
    

我正在尝试使用numPartitions=32读取具有500M行的MySQL表格。然而,使用Spark进行阅读仍然比(也是32个任务)sqoop慢得多。我甚至尝试将fetchsize设置为更高的值(1k或10k),但没有任何收益。我正在使用标准的Connector/J v5.1.41,并且我正在使用MySQL v5.6 - y2k-shubham
有没有办法在Spark 2.4中改进JDBC读取器的并行性能? - Sree51

-2

请按照以下步骤操作

1.下载MySQL的JDBC连接器副本。我相信您已经有了一个。

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2. 在以下格式中创建一个 db-properties.flat 文件

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>

3.首先创建一个空表,用于加载数据。

使用驱动程序类调用Spark Shell

spark-shell --driver-class-path  <your path to mysql jar>

然后导入所有必需的包

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

初始化一个Hive上下文或SQL上下文

val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql

设置一些属性

sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

从文件加载mysql数据库属性

val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db-        properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")

创建一个查询来读取您的表中的数据,并将其传递给#sqlcontext的read方法。这是您可以管理where子句的地方

val df1 = "(SELECT  * FROM your_table_name) as s1" 

将JDBC URL、选择查询和数据库属性传递给read方法

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

将其写入您的表格中

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")

你认为你有什么不同的做法可以解决所提出的问题,即*MySQL中的表格缓慢读入DataFrame*?你能否指出问题中提供的代码片段*中存在的错误/低效之处,并说明你提供的代码片段*对其进行了改进? - y2k-shubham

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接