Spark结构化流程:如何将聚合DataFrame连接到DataFrame?

4

我有一个流数据框,可能在某个时刻看起来像:

+--------------------+--------------------+
|               owner|              fruits|
+--------------------+--------------------+
|Brian                | apple|
Brian                | pear |
Brian                | date|
Brian                | avocado|
Bob                | avocado|
Bob                | apple|
........
+--------------------+--------------------+

我进行了一次groupBy,使用agg collect_list来整理数据。

val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")

输出是每个所有者的一行和每种水果的数组。 现在,我想将这个清理过的数组与原始流数据框连接起来,删除水果列,并只保留水果A列。
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")

这在我的脑海里看起来是可行的,但 Spark 程序好像并不同意。
我遇到了一个

错误。
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
      +- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]

当我将所有内容转换为静态数据帧时,它可以正常工作。但在流处理的情况下是否不可行?
1个回答

0

我已经弄清楚了,忘记更新线程了。 - Brian
@Brian 这段代码对你有效吗?我同时使用 Kafka 作为源和汇,并且,如果使用 outputMode("append") ,会得到 Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; 的错误提示,而如果使用outputMode("update") ,则会得到 Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;; 的错误提示。 - redsk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接