如何在Spark RDD中获得一个SQL的row_number等价函数?

28

我需要为一个包含多列数据表生成一份完整的行数列表。

在 SQL 中,这会像这样:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;

现在,假设我在Spark中有一个形式为(K, V)的RDD,其中V=(col1, col2, col3),那么我的条目就像下面这样:

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

我希望使用sortBy()、sortWith()、sortByKey()、zipWithIndex等命令来排序,以获得正确的行号,并生成一个新的RDD。
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

我不在乎括号,所以形式也可以是 (K, (col1,col2,col3,rownum))。

我该怎么做呢?

以下是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

请注意,函数sortBy不能直接应用于RDD,但必须先运行collect(),然后输出结果不是RDD,而是数组。

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

这里有一些进展,但还没有进行分区。
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)

这个问题是其他几个部分回答的问题的扩展,即https://dev59.com/vWAg5IYBdhLWcg3wOoxQ、http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group-by-group、http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%25silvio.fiorito@granturing.com%3E、https://dev59.com/hV8d5IYBdhLWcg3wQwa5、https://dev59.com/_2Af5IYBdhLWcg3wbCF3。 - Glenn Strycker
我也在寻找答案。Hive在0.11版本中添加了分析函数(包括row_number()HIVE-896,而Spark 1.1支持HiveQL / Hive 0.12。因此,似乎sqlContext.hql("select row_number() over(partition by ...应该可以工作,但我却遇到了错误。 - dnlbrky
4个回答

28

在Spark 1.4中添加了row_number() over (partition by ... order by ...)功能。本答案使用PySpark/DataFrames。

创建一个测试DataFrame:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

添加分区行号:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+

5

您提出了一个有趣的问题。我将用Python来回答,但我相信您能够顺畅地将其翻译成Scala。

以下是我解决它的方法:

1- 简化您的数据:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

现在,temp2是一个“真正的”键值对。它看起来像这样:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))

2- 接着,使用 group-by 函数来复制 PARTITION BY 的效果:

temp3 = temp2.groupByKey()

现在,temp3是一个包含2行的RDD:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
 ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]

3- 现在,您需要为RDD的每个值应用一个排名函数。 在Python中,我会使用简单的sorted函数(枚举将创建您的行号列):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

请注意,要实现您特定的订单,您需要提供正确的“key”参数(在Python中,我只需创建像这样的lambda函数:
lambda tuple : (tuple[0],-tuple[1],tuple[2])

在结尾处(如果没有关键字参数函数,它看起来像这样):
[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))

希望能帮到你!祝你好运。

第三步真是太棒了! - Michael Szczepaniak

1
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))

测试: Seq [(String,(Int,Int,Int))] = List((key1,(1,2,3)),(key1,(4,5,6)),(key2,(7,8,9)),(key2,(0,1,2)))

test.foreach(println)

(key1,(1,2,3))

(key1,(4,5,6))

(key2,(7,8,9))

(key2,(0,1,2))

val rdd = sc.parallelize(test, 2)

rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26

解释:这是一个Spark RDD对象的定义,其中包含一个元组,元组中有一个字符串和三个整数。这个RDD对象是一个并行集合RDD对象,编号为41,在26行的parallelize方法中创建。
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))

rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25

rdd1:org.apache.spark.rdd.RDD[(字符串,数组[((整数,整数,整数),整数)])] = MapPartitionsRDD[44]在map在:25

val rdd2 = rdd1.flatMap{ 
  elem =>
   val key = elem._1
   elem._2.map(row => (key, row._1, row._2))
 }

rdd2:org.apache.spark.rdd.RDD [(String,(Int,Int,Int),Int)] = MapPartitionsRDD [45] at flatMap at:25

解释:这是一个关于Spark编程的代码行,其中rdd2是一个RDD对象,其元素为一个三元组和一个整数。该RDD对象是通过对原始数据进行flatMap操作得到的,对应的代码行数为25。
rdd2.collect.foreach(println)

(关键词1,(1,2,3),0)

(关键词1,(4,5,6),1)

(关键词2,(0,1,2),0)

(关键词2,(7,8,9),1)


0

从 Spark SQL,读取数据文件...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");

以上文件包含字段 user_id、pageviews 和 clicks。

按 user_id 分区并按 clicks 排序生成活动 ID(行号)

val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接