我正在使用 Spark 结构化流处理从 Kafka 读取的记录。以下是我想要实现的内容:
(a) 每个记录都是类型为 (Timestamp, DeviceId) 的 Tuple2。
(b) 我已创建一个静态 Dataset[DeviceId],其中包含了预期在 Kafka 流中看到的所有有效设备 ID(设备 ID 的类型为 DeviceId)。
(c) 我需要编写一个 Spark 结构化流查询,该查询应该:
这是我的代码片段:
(a) 每个记录都是类型为 (Timestamp, DeviceId) 的 Tuple2。
(b) 我已创建一个静态 Dataset[DeviceId],其中包含了预期在 Kafka 流中看到的所有有效设备 ID(设备 ID 的类型为 DeviceId)。
(c) 我需要编写一个 Spark 结构化流查询,该查询应该:
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
例如,假设所有有效设备ID的列表为[A,B,C,D,E]
,在某个5分钟窗口内kafka记录包含设备ID [A,B,E]
。那么,在该窗口中,我正在寻找未见过的设备ID列表为[C,D]
。
问题
1. 如何在Spark结构化流中编写此查询?我尝试使用Dataset
公开的except()
和join()
方法。然而,它们都抛出了一个运行时异常,指出这些操作都不支持streaming Dataset
。这是我的代码片段:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L)))
case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)
// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
.count()
.map(row => (row.getLong(0), 1L))
.as[(Long, Long)]
val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
.filter(row => row.isNullAt(1))
.map(row => row.getLong(0))
最后一个语句会抛出以下异常:
Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;
提前致谢。