Spark数据框按组分组并创建新的指标列。

4

我需要按“KEY”列进行分组,并检查“TYPE_CODE”列是否同时具有“PL”和“JL”的值,如果是,则需要添加一个指示器列为“Y”,否则为“N”

例子:

    //Input Values
    val values = List(List("66","PL") ,
    List("67","JL") , List("67","PL"),List("67","PO"),
    List("68","JL"),List("68","PO")).map(x =>(x(0), x(1)))

    import spark.implicits._
    //created a dataframe
    val cmc = values.toDF("KEY","TYPE_CODE")

    cmc.show(false)
    ------------------------
    KEY |TYPE_CODE  |
    ------------------------
    66  |PL |
    67  |JL |
    67  |PL |
    67  |PO |
    68  |JL |
    68  |PO |
    -------------------------

预期输出:

每个"KEY",如果它的"TYPE_CODE"既有PL又有JL,则为Y,否则为N。

    -----------------------------------------------------
    KEY |TYPE_CODE  | Indicator
    -----------------------------------------------------
    66  |PL         | N
    67  |JL         | Y
    67  |PL         | Y
    67  |PO         | Y
    68  |JL         | N
    68  |PO         | N
    ---------------------------------------------------

例如,67同时具有PL和JL - 所以是“Y” 66只有PL - 所以是“N” 68只有JL - 所以是“N”
2个回答

4

一种选项:

1)将TYPE_CODE收集为列表;

2)检查它是否包含特定的字符串;

3)然后使用explode展开列表:

(cmc.groupBy("KEY")
    .agg(collect_list("TYPE_CODE").as("TYPE_CODE"))
    .withColumn("Indicator", 
        when(array_contains($"TYPE_CODE", "PL") && array_contains($"TYPE_CODE", "JL"), "Y").otherwise("N"))
    .withColumn("TYPE_CODE", explode($"TYPE_CODE"))).show
+---+---------+---------+
|KEY|TYPE_CODE|Indicator|
+---+---------+---------+
| 68|       JL|        N|
| 68|       PO|        N|    
| 67|       JL|        Y|
| 67|       PL|        Y|
| 67|       PO|        Y|
| 66|       PL|        N|
+---+---------+---------+

4

另一个选项:

  1. KEY分组并使用agg创建两个单独的指标列(一个为JL,一个为PL),然后计算组合指标

  2. 与原始数据框进行join

总之:

val indicators = cmc.groupBy("KEY").agg(
  sum(when($"TYPE_CODE" === "PL", 1).otherwise(0)) as "pls",
  sum(when($"TYPE_CODE" === "JL", 1).otherwise(0)) as "jls"
).withColumn("Indicator", when($"pls" > 0 && $"jls" > 0, "Y").otherwise("N"))

val result = cmc.join(indicators, "KEY")
  .select("KEY", "TYPE_CODE", "Indicator")

这可能比@Psidom的答案慢,但可能更加安全 - 如果您有大量特定键的匹配项(该列表必须存储在单个工作程序的内存中),则collect_list可能存在问题。
编辑:如果已知输入是唯一的(即JL / PL最多只出现一次),则可以使用简单的计数聚合创建指标,这可能更容易阅读。
val indicators = cmc
  .where($"TYPE_CODE".isin("PL", "JL"))
  .groupBy("KEY").count()
  .withColumn("Indicator", when($"count" === 2, "Y").otherwise("N"))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接