如何将数据集写入Kafka主题?

8
我正在使用Spark 2.1.0和Kafka 0.9.0。
我试图将批处理Spark作业的输出推送到kafka。该作业应每小时运行一次,但不是作为流处理。
在网上寻找答案时,我只能找到与Spark流集成相关的kafka,没有关于批处理作业集成的内容。
有人知道这种事情是否可行吗?
谢谢
更新:
如用户8371915所述,我尝试按照 Writing the output of Batch Queries to Kafka中的步骤进行操作。
我使用了一个spark shell:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

这是我尝试的简单代码:

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

但我收到了错误提示:

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

有任何想法这是与什么相关的吗?

谢谢

3个回答

10

简而言之:您正在使用过时的Spark版本。写入在2.2及更高版本中启用。

开箱即用,您可以使用Kafka SQL连接器(与Structured Streaming使用相同)。请包含以下内容:

  • 在您的依赖项中添加spark-sql-kafka
  • 将数据转换为至少包含value列的DataFrame,该列类型为StringTypeBinaryType
  • 将数据写入Kafka:

df   
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", server)
  .save()

请参考Structured Streaming文档获取详细信息(从将批次查询的输出写入Kafka开始)。


4

如果你有一个数据框并且想将它写入kafka主题,你需要先将列转换为包含json格式数据的"value"列。在scala中,可以通过以下方式实现:

import org.apache.spark.sql.functions._

val kafkaServer: String = "localhost:9092"
val topicSampleName: String = "kafkatopic"

df.select(to_json(struct("*")).as("value"))
  .selectExpr("CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServer)
  .option("topic", topicSampleName)
  .save()

这就是我要找的答案。如何将由多个列组成的数据框转换为一个名为“value”的列的数据框。谢谢。 - Onur Demir
当我在本地笔记本电脑上的Spark Shell中运行此命令时,代码会一直挂起。有任何想法吗? - chendu
是的,它会挂起。你最好尝试 df2=df.cahce() ; df2.show() ; 然后将上面的命令从 df 替换为 df2;这样可以加快速度。 - chendu

0
针对这个错误 java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider不允许创建表作为选择。 在scala.sys.package$.error(package.scala:27)

我认为您需要将消息解析为键值对。您的数据框应该有一个值列。

假设您有一个包含学生ID和分数的数据框。

df.show()
>> student_id | scores
    1         |  99.00
    2         |  98.00

那么你应该修改你的数据框为

value
{"student_id":1,"score":99.00}
{"student_id":2,"score":98.00}

要进行转换,你可以使用类似于此的代码

df.select(to_json(struct($"student_id",$"score")).alias("value"))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接