我已经设置好了Spark Structured Streaming(Spark 2.3.2)来从Kafka(2.0.0)中读取。如果消息在启动Spark流式作业之前进入主题,则无法从主题的开头进行消费。这是否是Spark Stream忽略在初始化运行Spark Stream作业之前生成的Kafka消息的预期行为(即使使用.option("startingOffsets","earliest"))?
重现步骤
在启动流式作业之前,创建名为
test
的主题(单个broker、单个分区),并向该主题发送消息(本例中有3条消息)。使用以下命令启动spark-shell:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/
执行下面的Spark Scala代码。
// Local
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("failOnDataLoss","false")
.option("stratingOffsets","earliest")
.option("subscribe", "test")
.load()
// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
.start()
期望输出与实际输出
我期望流从offset=1开始,但它却从offset=3开始读取。你可以看到kafka客户端实际上重置了起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
我发现Spark streaming处理我在启动流式作业后生产的消息。
这是Spark streaming预期的行为吗?即使使用.option("startingOffsets","earliest")
,它是否忽略在初始运行Spark Stream作业之前生成的Kafka消息?
2019-06-18 21:22:57 INFO AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
Spark批处理模式
我能够确认批处理模式从头开始读取 - 因此Kafka保留配置没有问题。
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("subscribe", "test")
.load()
df.count // Long = 3
group.id
对这个问题没有任何影响。它对 API 用户是透明的。请参考 Spark 文档(https://spark.apache.org/docs/2.3.2/structured-streaming-kafka-integration.html)中有关group.id
的以下注释: “Kafka 源将自动为每个查询创建一个唯一的组 ID。” - Daniel Ahn