使用您的“next”方法,并假定数据按升序生成,则以下内容可以并行工作,但如果实际上更快,请告诉我;我不知道您的数据量。无论如何,您不能仅使用SQL(%sql)来解决问题。下面是代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class X(identifier: Long, line: Long)
val df = Seq(
(1000000, 23), (1200, 56), (1201, 58), (1202, 60),
(8200, 63), (890000, 67), (990000, 99), (33000, 123),
(33001, 124), (33002, 126), (33009, 132), (33019, 133),
(33029, 134), (33039, 135), (800, 201), (1800, 999),
(1801, 1999), (1802, 2999), (1800444, 9999)
).toDF("identifier", "line")
val df2 = df.as[X]
.rdd
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x ))
}).mapValues(v => (v.identifier, v.line)).map(x => (x._1, x._2._1, x._2._2))
.toDF("part", "identifier", "line")
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("part").orderBy("line")
val df3 = df2.withColumn("next", lead("line", 1, null).over(w))
val df4 = df3.filter(df3("part") =!= 0).groupBy("part").agg(min("line").as("nxt")).toDF("pt", "nxt")
val df5 = df3.join(df4, (df3("part") === df4("pt") - 1), "outer" )
val df6 = df5.withColumn("next", when(col("next").isNull, col("nxt")).otherwise(col("next"))).select("identifier", "line", "next")
df6.show(false)
返回:
+----------+----+----+
|identifier|line|next|
+----------+----+----+
|1000000 |23 |56 |
|1200 |56 |58 |
|1201 |58 |60 |
|1202 |60 |63 |
|8200 |63 |67 |
|890000 |67 |99 |
|990000 |99 |123 |
|33000 |123 |124 |
|33001 |124 |126 |
|33002 |126 |132 |
|33009 |132 |133 |
|33019 |133 |134 |
|33029 |134 |135 |
|33039 |135 |201 |
|800 |201 |999 |
|1800 |999 |1999|
|1801 |1999|2999|
|1802 |2999|9999|
|1800444 |9999|null|
+----------+----+----+
您可以添加额外的排序等。在添加分区索引时依赖于狭窄转换。如何加载可能是一个问题。不考虑缓存。
如果数据未按上述方式排序,则需要先进行范围分区。