从 SQL 查询创建 Spark Dataframe

25

我相信这是一个简单的SQLContext问题,但我在Spark文档或Stackoverflow中找不到答案。

我想从MySQL的SQL查询创建一个Spark Dataframe。

例如,我有一个复杂的MySQL查询:

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

我希望得到一个包含X、Y和Z列的数据框。

我已经找到了如何将整个表加载到Spark中,并且我可以将它们全部加载,然后在那里执行连接和选择操作。但这非常低效。我只想加载由我的SQL查询生成的表。

这是我目前近似的代码,但不起作用。mysql-connector有一个“dbtable”选项,可用于加载整个表。我希望有一种指定查询的方法。

  val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
    ).load()

有关此内容(以及更多内容)的文档可以在此处找到:https://spark.apache.org/docs/2.4.4/sql-data-sources-jdbc.html :) - mike
5个回答

41

我在这里找到了通过Spark SQL进行批量数据迁移

dbname参数可以是任何用别名包装在括号内的查询。所以在我的情况下,我需要这样做:

val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

不出所料,将每个表加载为其自己的数据帧(Dataframe),然后在Spark中连接它们非常低效。


3
如果您已经在SQLContext中注册了您的table,那么您可以直接使用sql方法。
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

顺便提一下,这不是你问题的要点,所以我将其作为评论添加。如果你想知道如何实现之前的步骤(连接到mysql等),你可以查看这篇文章 Spark + MySQL example - Alberto Bonsanto
谢谢。我已经找到了如何将整个表加载到Spark中。然而,我的问题是我有一个复杂的查询,连接许多大表,并仅选择几列。我希望创建一个只包含所选列的单个简单数据框架。 - opus111
1
如何使用SQLContext注册表格? - Anish

1

要将查询结果保存到新数据框中,只需将结果设置为变量:

val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

现在,newDataFrame 是一个具有所有数据框功能的数据帧。

谢谢,但是我需要在SQL中连接多个表并仅选择少量列。我不想将每个表都加载到Spark中。我能否创建一个通过查询加载的SQL表?请查看我添加到问题中的细节。 - opus111

0

简述:只需在数据库中创建一个视图。

详细描述: 我在我的Postgres数据库中有一个名为t_city的表,我在其上创建了一个视图:

create view v_city_3500 as
    select asciiname, country, population, elevation
    from t_city
    where elevation>3500
    and population>100000

select * from v_city_3500;

 asciiname | country | population | elevation
-----------+---------+------------+-----------
 Potosi    | BO      |     141251 |      3967
 Oruro     | BO      |     208684 |      3936
 La Paz    | BO      |     812799 |      3782
 Lhasa     | CN      |     118721 |      3651
 Puno      | PE      |     116552 |      3825
 Juliaca   | PE      |     245675 |      3834

在spark-shell中:
val sx= new org.apache.spark.sql.SQLContext(sc)

var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"

val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)

结果:

city_df.count()
Long = 145725

city_3500_df.count()
Long = 6

0

使用MYSQL读取/加载数据,类似于以下内容

val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val jdbcDF = sqlContext.read.format("jdbc").options(
      Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
        "dbtable" -> "TABLE_NAME")).load()

将数据写入以下表格

import java.util.Properties
    val prop = new Properties()
    prop.put("user", "<>")
    prop.put("password", "simple$123")
    val dfWriter = jdbcDF.write.mode("append")
    dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)

要从查询中创建数据框,请执行以下操作:

val finalModelDataDF = {
      val query = "select * from table_name"
      sqlContext.sql(query)
    };

    finalModelDataDF.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接