如何将流式数据集写入Kafka?

5

我正在尝试对话题数据进行一些丰富。因此,使用Spark结构化流从Kafka汇集读取并将其发送回Kafka。

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

但我遇到了一个异常:
Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

有什么解决办法吗?
3个回答

3

目前最新版本的Spark 2.1不支持Kafka Writer,但是下一个版本2.2将会支持,具体信息可以查看此次提交记录

Kafka Sink和Kafka Writer是相同的。


3
T. Gawęda所述,目前没有kafka格式可用于将流数据集写入Kafka(即Kafka sink)。
在Spark 2.1中,目前推荐的解决方案是使用foreach operator

foreach操作允许对输出数据执行任意操作。截至Spark 2.1,此功能仅适用于Scala和Java。要使用此功能,您必须实现接口ForeachWriter(Scala / Java文档),该接口具有在触发器生成一系列行输出后调用的方法。请注意以下重要点。


我很想看到实现。完成后,能否简要说明一下?谢谢! - Jacek Laskowski
2
我正在使用来自夜间构建的Spark 2.2快照版本,我通过添加Apache快照存储库从Maven引用了它。现在我将测试Kafka同步,稍后如果您想的话,我可以分享我的经验=) - Danilow

0

试试这个

ds.map(_.toString.getBytes).toDF("value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092"))
      .option("topic", topic)
      .start
      .awaitTermination()

更详细地解释你的解决方案,让人们更好地理解它。 - TheParam
你只需要将数据集转换回带有列名“value”的DataFrame即可,因为Kafka要求键值对中value是必填字段。 - Ayush Hooda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接