Spark Dataframe:选择唯一的行

4

我尝试了两种方法来从parquet中查找不同的行,但好像都不起作用。
尝试1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
但是会抛出以下异常:

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

尝试2: 尝试运行SQL查询:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

我得到的错误信息:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

在读取Parquet文件时,是否有获取不重复记录的方法? 有没有可以使用的读取选项。

3个回答

7

你面临的问题在异常信息中已经明确说明了 - 因为 MapType 列既不可哈希也不可排序,不能作为分组或分区表达式的一部分。

你对 SQL 解决方案的理解与基于一组兼容列的数据去重的 dropDuplicates 不符合逻辑等价关系:

df.dropDuplicates("timestamp")

这相当于

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

不幸的是,如果你的目标是实际使用DISTINCT,它并不会那么容易。一种可能的解决方案是利用Scala* Map 哈希技术。你可以定义一个像这样的Scala udf:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

然后在您的Java代码中使用它来派生列,以便可以用于dropDuplicates

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

与 SQL 等效

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes

请注意,带有其hashCode的java.util.Map将无法正常工作,因为hashCode不一致。

4

1)如果您想根据列进行区分,可以使用它

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")


scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

val distinctValuesDF = df.select(df("no")).distinct

scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+

2) 如果你想在所有列上都有唯一值,请使用dropduplicate。

scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")



scala> df.show

+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+


scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

2

是的,语法不正确,应该是:

最初的回答

Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接