KafkaConsumer Java API的subscribe()与assign()的区别

22

我刚学习Kafka Java API,正在尝试消费特定Kafka主题的记录。

我知道可以使用subscribe()方法从主题开始轮询记录。如果要从选定分区的主题开始轮询记录,Kafka还提供了assign()方法。

我想了解这两个方法之间是否是唯一的区别?


1
“Subscribe” 使用消费者组;Kafka 协调器将分配发送给消费者和订阅主题的分区,将分布在该组内的实例中。“Assign” 强制分配到一系列主题。 - Mohsen
谢谢,@Spara。明白了。如果我不提供GroupId,subscribe()会抛出InvalidGroupIdException,但assign()可以正常工作。那么使用assign()的每个进程都是独立的,并且将接收来自主题分区的所有记录? - Karan Khanna
2
@Spara您的评论暗示了assign不能使用消费者组,实际上是可以的。该组将不用于协调,而是用于偏移跟踪。 - Scott Carey
2个回答

27

是的,subscribe需要group.id,因为组内的每个消费者将动态分配到提供的话题列表的分区,并且每个分区可以由组中的一个消费者线程消费。这是通过在消费者组的所有成员之间平衡分区来实现的,以便每个分区恰好分配给该组中的一个消费者。

assign会手动将一系列分区分配给此消费者。而此方法不使用消费者组管理功能(无需group.id

主要区别在于,assign(Collection)将失去对动态分区分配和消费者组协调的控制。

消费者还可以使用assign(Collection)手动分配特定的分区(类似于旧版“简单”消费者)。在这种情况下,将禁用动态分区分配和消费者组协调。

订阅

public void subscribe(java.util.Collection<java.lang.String> topics)

subscribe()方法订阅给定的主题列表以动态分配分区,如果给定的主题列表为空,则视为unsubscribe()方法的相同。

作为组管理的一部分,消费者将跟踪属于特定组的消费者列表,并在以下事件之一触发时触发重新平衡操作 -

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API

分配

public void assign(java.util.Collection<TopicPartition> partitions)

assign方法手动将一组分区分配给此消费者。如果给定的主题分区列表为空,则视为取消订阅。

通过此方法进行的手动主题分配不使用消费者的组管理功能。因此,在组成员资格、集群和主题元数据更改时,不会触发重新平衡操作。


也许在字里行间,但对于那些思考这是否意味着在使用assign()时没有任何消费者重新平衡监听器会触发的人来说。 - PragmaticProgrammer
使用assign时,您将失去消费者组协调。@PragmaticProgrammer - Ryuzaki L
9
值得注意的是,assign() 可以使用 group.id 存储已提交的偏移量。有一个 assign() 消费者可以使用自己的专用 group.id 跟踪自己的偏移量,而不需要与其他消费者协调。在某些情况下,这样做会带来一些好处。 - Scott Carey
如果两个具有相同组ID的消费者订阅单个分区,会发生什么?在这种情况下,自动提交是如何工作的?虽然这不是理想的情况,但事情在这种情况下是如何运作的? - Vinodini Natrajan
1
不可能,只有其中一个消费者会从该分区消费。 - Nerm
@RyuzakiL 在使用assign()时,如何在启动消费者时正确进行查找?在使用subscribe()的情况下,我们可以利用ConsumerRebalanceListeneronPartitionsAssigned将消费者定位到所需位置。 - tuk

0

我想为没有group.id的消费者添加一些有用的信息。如果没有框架的花招(KafkaClient库+Java),这个属性没有默认值。虽然不是官方的说法,但通常称之为“自由消费者”。自由消费者不订阅主题,因此需要分配主题分区。

正如上面所提到的,自动分区分配、重新平衡、偏移持久化、分区排他性、消费者心跳和故障检测/活性(所有这些都是消费者组的特点)在这些自由消费者中都被抛弃了。因此,客户端(即您)需要跟踪应用程序与Kafka相关的任何状态,包括跟踪偏移量(例如使用Map)。这是因为自由消费者不会将其偏移量提交给Kafka,通常会使用自己的存储机制。


关于提交/跟踪偏移量:Scott Carey注意到,通过使用group.id,仍然可以在消费者组级别上自动处理偏移量。 - Juozas
1
没错 - 答案中的第一行说没有 group.id 的消费者。如果没有它,主题分发功能(将分区均匀分配给组中的消费者)是不可能实现的。 - Nerm
@Nerm 如果我有两个“自由消费者”分配到同一个主题(假设分区数为1),并且将auto.offset.reset设置为earliest,那么它们都保证能够获取所有的消息吗? - tuk
1
通常称为“空闲/未分配”。我看到的所有文档都没有说“自由消费者”。 - OneCricketeer
@OneCricketeer 是的先生。免费使用的消费者。我同意使用group.id=null创建KafkaConsumer是一个空闲/未分配的消费者/自由消费者。一旦调用了freeConsumer.assign(tp),它就不再是未分配的“自由”消费者。一旦这个“自由”消费者调用.poll(),它就不再是空闲的消费者了。———关于“没有看到任何文档称之为'自由消费者'”,Emil Koutanov在他的书《Effective Kafka》中提到了这样的说法。 - Nerm
1
@tuk 如果 group.id=null,并且自由消费者已被分配到 TopicParition,则应在分配后调用 .seekToBeginning(tp) 以确保消费者获取所有未过期的消息。我有一个证明这一点的例子在这里。在 cfk 目录下,将部署扩展/缩小 x 次数不会破坏保证。 - Nerm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接