Spark结构化流 - 将静态数据集与流数据集进行连接

14
我正在使用 Spark 结构化流处理从 Kafka 读取的记录。以下是我想要实现的内容:
(a) 每个记录都是类型为 (Timestamp, DeviceId) 的 Tuple2。
(b) 我已创建一个静态 Dataset[DeviceId],其中包含了预期在 Kafka 流中看到的所有有效设备 ID(设备 ID 的类型为 DeviceId)。
(c) 我需要编写一个 Spark 结构化流查询,该查询应该:
 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not** seen in that window
例如,假设所有有效设备ID的列表为[A,B,C,D,E],在某个5分钟窗口内kafka记录包含设备ID [A,B,E]。那么,在该窗口中,我正在寻找未见过的设备ID列表为[C,D]问题 1. 如何在Spark结构化流中编写此查询?我尝试使用Dataset公开的except()join()方法。然而,它们都抛出了一个运行时异常,指出这些操作都不支持streaming Dataset
这是我的代码片段:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) 

case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)

// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
     .withWatermark("timestamp", "5 minutes")
     .groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
     .count()
     .map(row => (row.getLong(0), 1L))
     .as[(Long, Long)]

val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
     .filter(row => row.isNullAt(1))
     .map(row => row.getLong(0))

最后一个语句会抛出以下异常:

Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;

提前致谢。


静态数据集有多大? - stefanobaghino
可能有几万个设备ID(例如:10K-50K) - jithinpt
在Spark中使用leftAnti或rightAnti连接。 - Suresh
2个回答

6
在Spark Structured Streaming中,关于“join操作”的情况如下:流式的DataFrames可以和静态的DataFrames进行连接,从而创建新的streaming DataFrames。但是,streaming Dataset与静态Datasets之间的外连接是有条件支持的,而对于streaming Dataset的右/左连接,通常不受结构化流处理的支持。因此,当您尝试将静态dataset与流数据集进行连接时,会遇到AnalysisException。作为我的话的证明,您可以查看Spark源代码,该抛出异常,表示您尝试的操作不受支持。
我尝试将stream of DataFrames与静态DataFrames进行连接。
val streamingDf = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", "structured_topic")
    .load()

val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")

//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")

//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")

如您所见,除了从Kafka消耗数据外,我还通过nc(netcat)启动的套接字读取数据,这极大地简化了在测试流应用程序时的工作。对我来说,这种方法在使用Kafkasocket作为数据来源时都很有效。
希望这有所帮助。

谢谢您的回复。但是,我认为这并不能解决我的使用情况。我需要查找静态数据集中存在但流式数据集中不存在的设备ID。如果我在左侧执行左连接,我不会得到这个结果,对吗?除了连接操作之外,在spark结构化流中还有其他实现这种逻辑的方法吗?再次感谢。 - jithinpt
根据Spark规范,您可以使用“结构化流”和“静态数据框架”进行左外连接,但不能使用“数据集”,请尝试将“数据框架”转换为“数据集”并模拟连接操作 - https://spark.apache.org/docs/2.0.1/structured-streaming-programming-guide.html#join-operations - user6860682

2

对于相反的流式数据集,在外部连接方面不支持

  • 流式数据集和静态数据集之间的外部连接是有条件支持的。
    • 不支持与流式数据集进行完全外部连接
    • 不支持在右侧使用流式数据集进行左外部连接
    • 不支持在左侧使用流式数据集进行右外部连接

如果其他Dataset很小,您可以使用Map或类似的结构,broadcast,并在UserDefinedFunction内引用它。

val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))

df.withColumn("foo", lookup($"_1"))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接