在Spark批处理作业中读取Kafka主题

7

我正在编写一个 Spark (v1.6.0) 批处理作业,用于从 Kafka 主题中读取数据。
我可以使用 org.apache.spark.streaming.kafka.KafkaUtils#createRDD,但是我需要为所有分区设置偏移量,并且还需要将它们存储在某个地方(ZK?HDFS?)以便知道下一个批处理作业从哪里开始。

在批处理作业中从 Kafka 中读取数据的正确方法是什么?

我也考虑编写一个流处理作业,该作业从 auto.offset.reset=smallest 开始读取并将检查点保存到 HDFS,然后在下一次运行时从该处开始。

但是,在这种情况下,我如何只获取一次并在第一批处理之后停止流式处理呢?


1
最好分成两个单独的问题。 - maasg
1个回答

4

createRDD 是从 kafka 中读取批次的正确方法。

要查询有关最新/最早可用偏移量的信息,请查看 KafkaCluster.scala 的方法 getLatestLeaderOffsetsgetEarliestLeaderOffsets。该文件以前是 private 的,但在最新版本的 spark 中应该是 public 的。


1
不,那些方法是用于从kafka获取最新可用偏移量的。请参见https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala,了解从存储中查找已提交偏移量的示例。 - Cody Koeninger
@CodyKoeninger 在0.10版本中不存在 KafkaCuster 类型。我有什么遗漏的吗?我想批量处理主题中的所有现有记录,但没有办法从 Kafka 检索最早和最新的偏移量。 - InfinitiesLoop
@CodyKoeninger 是的,但是使用Spark Stream API,您必须首先从主题开始消费以获取这些偏移量。没有通用的kafka消费者API。我只想为每个分区启动一个批处理作业,最小和最大偏移量,而不是流式数据。您是在说我应该拉取kafka消费者工件吗?这在Spark Streaming 0.8客户端中是不必要的,但是在0.10中缺少KafkaCluster类使其他东西成为必需品。 - InfinitiesLoop
不要添加Kafka消费者artifact,它已经作为传递依赖项包含在内。直接使用即可。 - Cody Koeninger
1
你成功让那些针对Spark 1.6的get方法工作了吗?我遇到了同样的问题,现在需要在"createRDD"中提供偏移量进行查找。 - Havnar
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接