使用Spark Dataframe如何进行等值检查而不使用SQL查询?

55

我想选择一个等于特定值的列。我正在用Scala编写代码,但遇到了一些问题。

这是我的代码

df.select(df("state")==="TX").show()

这将返回一个状态列,带有布尔值而不仅仅是 TX。

我还尝试过:

df.select(df("state")=="TX").show() 

但这也不起作用。

10个回答

74

我遇到了同样的问题,以下语法对我有效:

df.filter(df("state")==="TX").show()

我正在使用Spark 1.6。


6
在1.6版本中这对我不起作用,我必须使用方括号来筛选数据框,例如 df.filter(df["state"]=="TX").show() - Katya Willard
你建议的格式对我来说无法编译。你确定 df 的类型是 DataFrame 吗? - user3487888
4
带有括号的语法是 Scala,带有方括号的语法是 PySpark。 - PinoSan

29

还有一种类似 SQL 的简单选项。对于 Spark 1.6 及以下版本也应该适用。

df.filter("state = 'TX'")

这是一种新的指定SQL过滤器的方法。要获取支持的操作符的完整列表,请查看此类


刚刚看了一下链接,你是指第94行的运算符吗?你有什么想法可以告诉我如何使用它们或者找到有关如何使用它们的信息吗? - Topde

16

你应该使用whereselect是一个投影操作,它返回语句的输出,因此你会得到布尔值。 where是一个过滤器,它保留数据框的结构,但只保留过滤器起作用的数据。

然而,根据文档,你可以用三种不同的方式来编写这个操作。

// The following are equivalent:
peopleDf.filter($"age" > 15)
peopleDf.where($"age" > 15)
peopleDf($"age" > 15)

1
我刚刚尝试了df.filter($"state"=="TX"),但是它不起作用。错误:filter方法有多个重载形式: (conditionExpr: String)org.apache.spark.sql.DataFrame <和> (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame 不能应用于(Boolean)。 - Instinct
这个似乎可以工作。df.select("state").where(df("state")==="TX").count - Instinct
12
df.filter($"state"==="TX") 应该可以正常工作。在这里,你需要使用三个等号,它会返回一个列 -> https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.Column - Justin Pihony

10

要得到否定,按照以下步骤进行...

df.filter(not( ..expression.. ))

例如

df.filter(not($"state" === "TX"))

3
你可以使用df.filter($"state" !== "TX")来表示不等于的条件筛选。 - stevevls

9

df.filter($"state" like "T%%") 用于模式匹配

df.filter($"state" === "TX")df.filter("state = 'TX'") 用于相等性比较


6

曾经在Spark V2.*上工作过。

import sqlContext.implicits._
df.filter($"state" === "TX")

如果需要与变量(例如var)进行比较:

import sqlContext.implicits._
df.filter($"state" === var)

注意: import sqlContext.implicits._

5
我们可以在Dataframe中编写多个过滤/where条件。
例如:
table1_df
.filter($"Col_1_name" === "buddy")  // check for equal to string
.filter($"Col_2_name" === "A")
.filter(not($"Col_2_name".contains(" .sql")))  // filter a string which is    not relevent
.filter("Col_2_name is not null")   // no null filter
.take(5).foreach(println)

2
让我们创建一个样本数据集,并深入探讨为什么OP的代码没有起作用。
这是我们的样本数据:
val df = Seq(
  ("Rockets", 2, "TX"),
  ("Warriors", 6, "CA"),
  ("Spurs", 5, "TX"),
  ("Knicks", 2, "NY")
).toDF("team_name", "num_championships", "state")

我们可以使用show()方法对数据集进行漂亮的打印:
+---------+-----------------+-----+
|team_name|num_championships|state|
+---------+-----------------+-----+
|  Rockets|                2|   TX|
| Warriors|                6|   CA|
|    Spurs|                5|   TX|
|   Knicks|                2|   NY|
+---------+-----------------+-----+

让我们来看一下 df.select(df("state")==="TX").show() 的结果:
+------------+
|(state = TX)|
+------------+
|        true|
|       false|
|        true|
|       false|
+------------+

通过添加一列来更容易理解此结果 - df.withColumn("is_state_tx", df("state")==="TX").show()

+---------+-----------------+-----+-----------+
|team_name|num_championships|state|is_state_tx|
+---------+-----------------+-----+-----------+
|  Rockets|                2|   TX|       true|
| Warriors|                6|   CA|      false|
|    Spurs|                5|   TX|       true|
|   Knicks|                2|   NY|      false|
+---------+-----------------+-----+-----------+

OP尝试了另一段代码 (df.select(df("state")=="TX").show()),返回了以下错误:

<console>:27: error: overloaded method value select with alternatives:
  [U1](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1])org.apache.spark.sql.Dataset[U1] <and>
  (col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
  (cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
 cannot be applied to (Boolean)
       df.select(df("state")=="TX").show()
          ^
< p > === 运算符在 Column class 中定义。Column类没有定义 == 运算符,这就是为什么此代码出错的原因。

以下是可行的答案:

df.filter(df("state")==="TX").show()

+---------+-----------------+-----+
|team_name|num_championships|state|
+---------+-----------------+-----+
|  Rockets|                2|   TX|
|    Spurs|                5|   TX|
+---------+-----------------+-----+

正如其他帖子中提到的那样, === 方法需要一个 Any 类型的参数,因此这并不是唯一的可行解决方案。例如,下面的方法也可以用于此情况:
df.filter(df("state") === lit("TX")).show

+---------+-----------------+-----+
|team_name|num_championships|state|
+---------+-----------------+-----+
|  Rockets|                2|   TX|
|    Spurs|                5|   TX|
+---------+-----------------+-----+

列(Column)的equalTo方法也可以使用:

df.filter(df("state").equalTo("TX")).show()

+---------+-----------------+-----+
|team_name|num_championships|state|
+---------+-----------------+-----+
|  Rockets|                2|   TX|
|    Spurs|                5|   TX|
+---------+-----------------+-----+

值得详细研究这个例子。Scala的语法有时似乎很神奇,特别是在方法被调用时没有点符号。对于未经训练的眼睛来说,很难看出 === 是在 Column 类中定义的一个方法!

0

这里是使用Spark2.2+处理以JSON格式获取数据的完整示例...

val myjson = "[{\"name\":\"Alabama\",\"abbreviation\":\"AL\"},{\"name\":\"Alaska\",\"abbreviation\":\"AK\"},{\"name\":\"American Samoa\",\"abbreviation\":\"AS\"},{\"name\":\"Arizona\",\"abbreviation\":\"AZ\"},{\"name\":\"Arkansas\",\"abbreviation\":\"AR\"},{\"name\":\"California\",\"abbreviation\":\"CA\"},{\"name\":\"Colorado\",\"abbreviation\":\"CO\"},{\"name\":\"Connecticut\",\"abbreviation\":\"CT\"},{\"name\":\"Delaware\",\"abbreviation\":\"DE\"},{\"name\":\"District Of Columbia\",\"abbreviation\":\"DC\"},{\"name\":\"Federated States Of Micronesia\",\"abbreviation\":\"FM\"},{\"name\":\"Florida\",\"abbreviation\":\"FL\"},{\"name\":\"Georgia\",\"abbreviation\":\"GA\"},{\"name\":\"Guam\",\"abbreviation\":\"GU\"},{\"name\":\"Hawaii\",\"abbreviation\":\"HI\"},{\"name\":\"Idaho\",\"abbreviation\":\"ID\"},{\"name\":\"Illinois\",\"abbreviation\":\"IL\"},{\"name\":\"Indiana\",\"abbreviation\":\"IN\"},{\"name\":\"Iowa\",\"abbreviation\":\"IA\"},{\"name\":\"Kansas\",\"abbreviation\":\"KS\"},{\"name\":\"Kentucky\",\"abbreviation\":\"KY\"},{\"name\":\"Louisiana\",\"abbreviation\":\"LA\"},{\"name\":\"Maine\",\"abbreviation\":\"ME\"},{\"name\":\"Marshall Islands\",\"abbreviation\":\"MH\"},{\"name\":\"Maryland\",\"abbreviation\":\"MD\"},{\"name\":\"Massachusetts\",\"abbreviation\":\"MA\"},{\"name\":\"Michigan\",\"abbreviation\":\"MI\"},{\"name\":\"Minnesota\",\"abbreviation\":\"MN\"},{\"name\":\"Mississippi\",\"abbreviation\":\"MS\"},{\"name\":\"Missouri\",\"abbreviation\":\"MO\"},{\"name\":\"Montana\",\"abbreviation\":\"MT\"},{\"name\":\"Nebraska\",\"abbreviation\":\"NE\"},{\"name\":\"Nevada\",\"abbreviation\":\"NV\"},{\"name\":\"New Hampshire\",\"abbreviation\":\"NH\"},{\"name\":\"New Jersey\",\"abbreviation\":\"NJ\"},{\"name\":\"New Mexico\",\"abbreviation\":\"NM\"},{\"name\":\"New York\",\"abbreviation\":\"NY\"},{\"name\":\"North Carolina\",\"abbreviation\":\"NC\"},{\"name\":\"North Dakota\",\"abbreviation\":\"ND\"},{\"name\":\"Northern Mariana Islands\",\"abbreviation\":\"MP\"},{\"name\":\"Ohio\",\"abbreviation\":\"OH\"},{\"name\":\"Oklahoma\",\"abbreviation\":\"OK\"},{\"name\":\"Oregon\",\"abbreviation\":\"OR\"},{\"name\":\"Palau\",\"abbreviation\":\"PW\"},{\"name\":\"Pennsylvania\",\"abbreviation\":\"PA\"},{\"name\":\"Puerto Rico\",\"abbreviation\":\"PR\"},{\"name\":\"Rhode Island\",\"abbreviation\":\"RI\"},{\"name\":\"South Carolina\",\"abbreviation\":\"SC\"},{\"name\":\"South Dakota\",\"abbreviation\":\"SD\"},{\"name\":\"Tennessee\",\"abbreviation\":\"TN\"},{\"name\":\"Texas\",\"abbreviation\":\"TX\"},{\"name\":\"Utah\",\"abbreviation\":\"UT\"},{\"name\":\"Vermont\",\"abbreviation\":\"VT\"},{\"name\":\"Virgin Islands\",\"abbreviation\":\"VI\"},{\"name\":\"Virginia\",\"abbreviation\":\"VA\"},{\"name\":\"Washington\",\"abbreviation\":\"WA\"},{\"name\":\"West Virginia\",\"abbreviation\":\"WV\"},{\"name\":\"Wisconsin\",\"abbreviation\":\"WI\"},{\"name\":\"Wyoming\",\"abbreviation\":\"WY\"}]"
import spark.implicits._
val df = spark.read.json(Seq(myjson).toDS)
df.show 
   import spark.implicits._
    val df = spark.read.json(Seq(myjson).toDS)
    df.show

    scala> df.show
    +------------+--------------------+
    |abbreviation|                name|
    +------------+--------------------+
    |          AL|             Alabama|
    |          AK|              Alaska|
    |          AS|      American Samoa|
    |          AZ|             Arizona|
    |          AR|            Arkansas|
    |          CA|          California|
    |          CO|            Colorado|
    |          CT|         Connecticut|
    |          DE|            Delaware|
    |          DC|District Of Columbia|
    |          FM|Federated States ...|
    |          FL|             Florida|
    |          GA|             Georgia|
    |          GU|                Guam|
    |          HI|              Hawaii|
    |          ID|               Idaho|
    |          IL|            Illinois|
    |          IN|             Indiana|
    |          IA|                Iowa|
    |          KS|              Kansas|
    +------------+--------------------+

    // equals matching
    scala> df.filter(df("abbreviation") === "TX").show
    +------------+-----+
    |abbreviation| name|
    +------------+-----+
    |          TX|Texas|
    +------------+-----+
    // or using lit

    scala> df.filter(df("abbreviation") === lit("TX")).show
    +------------+-----+
    |abbreviation| name|
    +------------+-----+
    |          TX|Texas|
    +------------+-----+

    //not expression
    scala> df.filter(not(df("abbreviation") === "TX")).show
    +------------+--------------------+
    |abbreviation|                name|
    +------------+--------------------+
    |          AL|             Alabama|
    |          AK|              Alaska|
    |          AS|      American Samoa|
    |          AZ|             Arizona|
    |          AR|            Arkansas|
    |          CA|          California|
    |          CO|            Colorado|
    |          CT|         Connecticut|
    |          DE|            Delaware|
    |          DC|District Of Columbia|
    |          FM|Federated States ...|
    |          FL|             Florida|
    |          GA|             Georgia|
    |          GU|                Guam|
    |          HI|              Hawaii|
    |          ID|               Idaho|
    |          IL|            Illinois|
    |          IN|             Indiana|
    |          IA|                Iowa|
    |          KS|              Kansas|
    +------------+--------------------+
    only showing top 20 rows

-1
在Spark 2.4中,
与一个值进行比较:
df.filter(lower(trim($"col_name")) === "<value>").show()

与一组值进行比较:

df.filter($"col_name".isInCollection(new HashSet<>(Arrays.asList("value1", "value2")))).show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接