在没有明显分区列的情况下,使用具有多个分区的Spark窗口。

4
这里是一个场景。假设我有以下表格:
标识符
51169081604 2
00034886044 22
51168939455 52
挑战在于,对于每个单独的列,选择下一个最大的列,我已通过以下SQL实现:
SELECT i1.line,i1.identifier, 
MAX(i1.line) OVER (
    ORDER BY i1.line ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)AS parent
FROM global_temp.documentIdentifiers i1

挑战部分解决了,问题是当我在Spark上执行此代码时,性能非常糟糕。警告消息非常明显:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能降低。按任何两个字段进行分区都不起作用,当然,因为每个创建的分区都不知道其他行的存在,所以它会破坏结果。有人知道如何“选择下一个最大的列线路”而不会出现性能问题吗?谢谢。

如果我添加一个分区键,那么“下一个最大列线”将由分区计算,并且我需要对整个数据集进行评估。 - Bruno Moreira
1
我明白你的意思,但是没有优雅的方法来做到这一点...请看第一个评论。我曾经尝试过使用mapPartitions或分区的方法,但是它很麻烦。 - thebluephantom
我明白了...使用mapPartition可以让我在使用mapParition内部的常规映射时仍能对最大问题有一定控制,并且仍能分配进程。我会尝试这样做,谢谢@thebluephanton。 - Bruno Moreira
使用分区,然后将边界作为额外的交换情况进行处理。 - thebluephantom
无法在SQL中完成。需要使用Spark DF API来完成。 - thebluephantom
显示剩余7条评论
1个回答

2
使用您的“next”方法,并假定数据按升序生成,则以下内容可以并行工作,但如果实际上更快,请告诉我;我不知道您的数据量。无论如何,您不能仅使用SQL(%sql)来解决问题。下面是代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class X(identifier: Long, line: Long) // Too hard to explain, just gets around issues with df --> rdd --> df.

// Gen some more data.
val df = Seq(
 (1000000, 23), (1200, 56), (1201, 58), (1202, 60),
 (8200, 63), (890000, 67), (990000, 99), (33000, 123),
 (33001, 124), (33002, 126), (33009, 132), (33019, 133),
 (33029, 134), (33039, 135), (800, 201), (1800, 999),
 (1801, 1999), (1802, 2999), (1800444, 9999)
 ).toDF("identifier", "line")

// Add partition so as to be able to apply parallelism - except for upper boundary record.
val df2 = df.as[X]
            .rdd
            .mapPartitionsWithIndex((index, iter) => {
                iter.map(x => (index, x ))   
             }).mapValues(v => (v.identifier, v.line)).map(x => (x._1, x._2._1, x._2._2))
            .toDF("part", "identifier", "line")

// Process per partition.
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("part").orderBy("line")  
val df3 = df2.withColumn("next", lead("line", 1, null).over(w))

// Process upper boundary.
val df4 = df3.filter(df3("part") =!= 0).groupBy("part").agg(min("line").as("nxt")).toDF("pt", "nxt")
val df5 = df3.join(df4, (df3("part") === df4("pt") - 1), "outer" )
val df6 = df5.withColumn("next", when(col("next").isNull, col("nxt")).otherwise(col("next"))).select("identifier", "line", "next")

// Display. Sort accordingly.
df6.show(false)

返回:

+----------+----+----+
|identifier|line|next|
+----------+----+----+
|1000000   |23  |56  |
|1200      |56  |58  |
|1201      |58  |60  |
|1202      |60  |63  |
|8200      |63  |67  |
|890000    |67  |99  |
|990000    |99  |123 |
|33000     |123 |124 |
|33001     |124 |126 |
|33002     |126 |132 |
|33009     |132 |133 |
|33019     |133 |134 |
|33029     |134 |135 |
|33039     |135 |201 |
|800       |201 |999 |
|1800      |999 |1999|
|1801      |1999|2999|
|1802      |2999|9999|
|1800444   |9999|null|
+----------+----+----+

您可以添加额外的排序等。在添加分区索引时依赖于狭窄转换。如何加载可能是一个问题。不考虑缓存。

如果数据未按上述方式排序,则需要先进行范围分区。


这太棒了@thebluephantom,我正在将它转换为Java,谢谢! - Bruno Moreira
优秀的意味着点赞...与 Java 相比,使用 Scala 更加容易。 - thebluephantom

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接