如何将Apache Spark与MySQL集成,以便将数据库表读取为Spark DataFrame?

37

我想使用Apache Spark和MySQL来运行我的现有应用程序。


可能这会有所帮助:http://www.infoobjects.com/spark-sql-jdbcrdd/ - Gábor Bakos
10个回答

43

使用pySpark,对我有效:

dataframe_mysql = mySqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/my_bd_name",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "my_tablename",
    user="root",
    password="root").load()

10
"mySqlContext" 应该改为 "sqlContext"。 - shellbye
3
这只是一个变量。您可以根据需要自定义名称。 任何名称的SQL上下文 = SQLContext(sc) - disp_name
如果我使用ODBC而不是JDBC,那么只需要在上面的文本中切换这两个即可,它们是否完全相同? - lwileczek
3
对于Spark 2.x版本,请使用以下语句进行连接数据库操作:dataframe = spark_session.read.format("jdbc").options(...).load() - Abdul Mannan
你从哪里获取那个驱动程序?它需要匹配的jar文件吗? - sheetal_158

23

使用 Spark 2.0.x,您可以使用 DataFrameReader 和 DataFrameWriter。使用 SparkSession.read 访问 DataFrameReader,并使用 Dataset.write 访问 DataFrameWriter。

假设使用 spark-shell。

读取示例

val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"

val df=spark.read.jdbc(url,"table_name",prop) 
df.show()

read example 2

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

读取示例3

如果您想从查询结果而不是表中读取数据。

来源:Spark文档

val sql="""select * from db.your_table where id>1"""
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql:dbserver")
  .option("dbtable",  s"( $sql ) t")
  .option("user", "username")
  .option("password", "password")
  .load()

编写示例

import org.apache.spark.sql.SaveMode

val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
//df is a dataframe contains the data which you want to write.
df.write.mode(SaveMode.Append).jdbc(url,"table_name",prop)

中文版戳我


工作得很好,干净利落!谢谢这个。 - Anubhav Dikshit
我们如何使用Spark从mysql连接中删除记录? - Rajiv Singh

16

使用Scala,这对我起了作用: 使用以下命令:

sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0

import org.apache.spark.sql.SQLContext

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://Public_IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tblage").option("user", "sqluser").option("password", "sqluser").load()

dataframe_mysql.show()

12

如果你使用 sbt ,那么这对于 Scala 也有效。

在你的 build.sbt 文件中:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.6.2",
    "org.apache.spark" %% "spark-sql" % "1.6.2",
    "org.apache.spark" %% "spark-mllib" % "1.6.2",
    "mysql" % "mysql-connector-java" % "5.1.12"
)

然后您只需要声明对驱动程序的使用。

Class.forName("com.mysql.jdbc.Driver").newInstance

val conf = new SparkConf().setAppName("MY_APP_NAME").setMaster("MASTER")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val data = sqlContext.read
.format("jdbc")
.option("url", "jdbc:mysql://<HOST>:3306/<database>")
.option("user", <USERNAME>)
.option("password", <PASSWORD>)
.option("dbtable", "MYSQL_QUERY")
.load()

2
看起来是自动完成出了问题,com.myself.jdbc.Driver -> com.mysql.jdbc.Driver? - Rodrigo Del C. Andrade
你说得对!谢谢你指出来。 - jstuartmill
1
为什么“dbtable”不是表名而是“MYSQL_QUERY”? - Hoang Minh Quang FX15045
1
为什么"dbtable"不是表名,而是"MYSQL_QUERY"? - undefined

7

对于使用Maven的Java项目,请在您的pom.xml文件中添加Spark依赖和SQL驱动程序依赖。

<properties>
    <java.version>1.8</java.version>
    <spark.version>1.6.3</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 <dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
</dependencies>

示例代码,假设您的mysql位于本地,数据库名称test用户名root密码password,并且test数据库中有两个表table1table2

SparkConf sparkConf = new SparkConf();
SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
SQLContext sqlContext = new SQLContext(sc);

// here you can run sql query
String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
// or use an existed table directly
// String sql = "table1";
DataFrame dataFrame = sqlContext
    .read()
    .format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
    .option("user", "root")
    .option("password", "password")
    .option("dbtable", sql)
    .load();

// continue your logical code
......

6
public static void main(String[] args) {
    Map<String, String> options = new HashMap<String, String>();
    options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
    options.put("dbtable", "<TableName>");
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    // DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
    DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
    System.out.println("Data------------------->" + jdbcDF.toJSON().first());
    Row[] rows = jdbcDF.collect();
    System.out.println("Without Filter \n ------------------------------------------------- ");
    for (Row row2 : rows) {
        System.out.println(row2.toString());
    }
    System.out.println("Filter Data\n ------------------------------------------------- ");
    jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
    rows = jdbcDF.collect();
    for (Row row2 : rows) {
        System.out.println(row2.toString());
    }
}

1
这段代码将帮助连接Spark和数据库。 - Jatin

5

对于Java,这个方法适用:

@Bean
public SparkConf sparkConf() {
    SparkConf sparkConf = new SparkConf()
            .setAppName(appName)
            .setSparkHome(sparkHome)
            .setMaster(masterUri);

    return sparkConf;
}

@Bean
public JavaSparkContext javaSparkContext() {
    return new JavaSparkContext(sparkConf());
}

@Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .sparkContext(javaSparkContext().sc())
            .appName("Java Spark SQL basic example")
            .getOrCreate();
}

Properties properties = new Properties();
        properties.put("user", "root");
        properties.put("password", "root");
        properties.put("driver", "com.mysql.cj.jdbc.Driver");
        sparkSession.read()
                    .jdbc("jdbc:mysql://localhost:3306/books?useSSL=false", "(SELECT books.BOOK_ID as BOOK_ID, books.BOOK_TITLE as BOOK_TITLE, books.BOOK_AUTHOR as BOOK_AUTHOR, borrowers.BORR_NAME as BORR_NAME FROM books LEFT OUTER JOIN borrowers ON books.BOOK_ID = borrowers.BOOK_ID) as t", properties) // join example
                    .show();

当然,对于MySQL,我需要使用连接器:
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>

我得到了

+-------+------------------+--------------+---------------+
|BOOK_ID|        BOOK_TITLE|   BOOK_AUTHOR|      BORR_NAME|
+-------+------------------+--------------+---------------+
|      1|        Gyűrű kúra|J.R.K. Tolkien|   Sára Sarolta|
|      2|     Kecske-eledel|     Mekk Elek|Maláta Melchior|
|      3|      Répás tészta| Vegán Eleazár|           null|
|      4|Krumpli és pityóka| Farmer Emília|           null|
+-------+------------------+--------------+---------------+

4

根据这篇infoobjects文章,尝试以下步骤(假设使用Java或Scala,不确定如何在Python中实现):

  • mysql-connector-java添加到您的Spark集群路径中
  • 初始化驱动程序:Class.forName("com.mysql.jdbc.Driver")
  • 创建JdbcRDD数据源:

val myRDD = new JdbcRDD( sc, () => 
                               DriverManager.getConnection(url,username,password),
                        "select first_name,last_name,gender from person limit ?, ?",
                        1,//lower bound
                        5,//upper bound
                        2,//number of partitions
                        r =>
                          r.getString("last_name") + ", " + r.getString("first_name"))

JdbcRDD现在已经不被鼓励使用了。最好查看Spark 1.4及更高版本中的DataFrame接口。 - Matt Ingenthron
@MattIngenthron 这是真的,尽管当问题被提出和回答时,它还没有可用。 - Gábor Bakos
1
好的,明白了。我在搜索时发现了这个,其他人也可能会这样做,所以我更新了它,以确保新人能找到最新的东西。 - Matt Ingenthron

2
   val query: String =
    "select col1, col2 from schema.table_name where condition"

  val url= "jdbc:mysql://<ip>:3306/<schema>"
  val username = ""
  val password = ""
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val df = sqlContext.load("jdbc", Map(
    "url" -> (url + "/?user=" + username + "&password=" + password),
    "dbtable" -> s"($query) as tbl",
    "driver" -> "com.mysql.jdbc.Driver"))

df.show()

SQLContext.load现已被弃用,并将在2.0中删除。 - kane

2

对于Spark 2.1.0和Scala(在Windows 7操作系统上),以下代码对我来说非常有效:

import org.apache.spark.sql.SparkSession

object MySQL {
  def main(args: Array[String]) {
    //At first create a Spark Session as the entry point of your app
    val spark:SparkSession = SparkSession
      .builder()
      .appName("JDBC")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "C:/Exp/")
      .getOrCreate();    

    val dataframe_mysql = spark.read.format("jdbc")
                          .option("url", "jdbc:mysql://localhost/feedback")
                          .option("driver", "com.mysql.jdbc.Driver")
                          .option("dbtable", "person") //replace with own
                          .option("user", "root") //replace with own 
                          .option("password", "vertrigo") // replace with own
                          .load()

    dataframe_mysql.show()
  }
}

在你的回答中指定驱动程序选项对我来说是必要的,以使其正常工作。 - rauljosepalma

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接