Spark Streaming:从具有多个模式的Kafka读取数据

6

我正在努力实现Spark Streaming技术。

Kafka传来的信息是这样的,但包含了更多的字段。

{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}

我正在尝试从Kafka主题中读取消息(该主题具有多个模式)。我需要读取每个消息并查找事件和源字段,并决定将其存储为数据集。实际数据在负载字段中作为JSON存在,仅为单个记录。

有人能帮我实现这个或其他替代方案吗?

将具有多个模式的消息发送到同一主题并进行消费是否是一个好方法?

提前致谢,


{btsdaf} - Nilesh
你是否在使用Avro模式?如果是,Avro具有模式演化功能可以解决你的问题。如果没有,尝试使用Avro模式。 - moon
{btsdaf} - koiralo
2个回答

1
你可以从传入的JSON对象创建一个 Dataframe
创建一个JSON对象的 Seq[String]
使用 val df=spark.read.json[Seq[String]]
对你选择的 dataframe df 执行操作。

0

如果您只关心一些列,将JsonString转换为JavaBean


{btsdaf} - koiralo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接