Spark: DataFrame 聚合(Scala)

3
我有一个聚合Spark dataframe数据的需求,使用scala语言进行实现。 同时,我有两个数据集。
数据集1包含每种“t”类型的值(val1,val2等),分布在不同的列中,如(t1,t2…) 。
val data1 = Seq(
    ("1","111",200,"221",100,"331",1000),
    ("2","112",400,"222",500,"332",1000),
    ("3","113",600,"223",1000,"333",1000)
).toDF("id1","t1","val1","t2","val2","t3","val3")

data1.show()

+---+---+----+---+----+---+----+
|id1| t1|val1| t2|val2| t3|val3|
+---+---+----+---+----+---+----+
|  1|111| 200|221| 100|331|1000|
|  2|112| 400|222| 500|332|1000|
|  3|113| 600|223|1000|333|1000|
+---+---+----+---+----+---+----+    

数据集2通过为每个“t”类型单独创建一行来表示相同的内容。

val data2 = Seq(("1","111",200),("1","221",100),("1","331",1000),
  ("2","112",400),("2","222",500),("2","332",1000),
  ("3","113",600),("3","223",1000), ("3","333",1000)
).toDF("id*","t*","val*")

data2.show()    

+---+---+----+
|id*| t*|val*|
+---+---+----+
|  1|111| 200|
|  1|221| 100|
|  1|331|1000|
|  2|112| 400|
|  2|222| 500|
|  2|332|1000|
|  3|113| 600|
|  3|223|1000|
|  3|333|1000|
+---+---+----+      

现在,我需要按(id,t,t*)字段进行分组,并将sum(val)和sum(val*)的余额作为单独的记录打印出来。两个余额应该相等。
My output should look like below:
+---+---+--------+---+---------+
|id1| t |sum(val)| t*|sum(val*)|
+---+---+--------+---+---------+
|  1|111|     200|111|      200|
|  1|221|     100|221|      100|
|  1|331|    1000|331|     1000|
|  2|112|     400|112|      400|
|  2|222|     500|222|      500|
|  2|332|    1000|332|     1000|
|  3|113|     600|113|      600|
|  3|223|    1000|223|     1000|
|  3|333|    1000|333|     1000|
+---+---+--------+---+---------+

我考虑将数据集1按照“t”类型分解成多个记录,然后与数据集2进行连接。 但是你能否提供更好的方法,避免数据集变大时影响性能?

1个回答

1
最简单的解决方案是执行子查询,然后合并数据集:
val ts = Seq(1, 2, 3)
val dfs = ts.map (t => data1.select("t" + t as "t", "v" + t as "v"))
val unioned = dfs.drop(1).foldLeft(dfs(0))((l, r) => l.union(r))

val ds = unioned.join(df2, 't === col("t*")
here aggregation

您也可以尝试使用explode函数与数组:

val df1 = data1.withColumn("colList", array('t1, 't2, 't3))
               .withColumn("t", explode(colList))
               .select('t, 'id1 as "id")

val ds = df2.withColumn("val", 
          when('t === 't1, 'val1)
          .when('t === 't2, 'val2)
          .when('t === 't3, 'val3)
          .otherwise(0))

最后一步是将此数据集与data2连接起来:
ds.join(data2, 't === col("t*"))
  .groupBy("t", "t*")
  .agg(first("id1") as "id1", sum(val), sum("val*"))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接