您可以查看
priority-kafka-client来实现对主题的优先消费。
基本思路如下(摘自README):
在此上下文中,优先级是一个正整数(N),具有优先级级别
0 < 1 < ... < N-1
PriorityKafkaProducer(实现org.apache.kafka.clients.producer.Producer):
该实现接受优先级等级的附加参数
Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
。这表示在该优先级级别上产生记录。
Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
将默认记录产生在最低优先级级别0上。对于每个逻辑主题XYZ - 优先级级别0 <= i XYZ-i支持
CapacityBurstPriorityKafkaConsumer(实现org.apache.kafka.clients.consumer.Consumer):
该实现维护每个优先级级别0 <= i ABC-i将使用者绑定到Kafka主题
XYZ-i
。这与PriorityKafkaProducer一起工作。
max.poll.records
属性根据
maxPollRecordsDistributor
- 默认为
ExpMaxPollRecordsDistributor
在优先级主题使用者之间拆分。其余的KafkaConsumer配置原样传递给每个优先级主题使用者。在定义
max.partition.fetch.bytes
,
fetch.max.bytes
和
max.poll.interval.ms
时必须小心,因为这些值将在所有优先级主题使用者中使用。
基于将
max.poll.records
属性分配到每个优先级主题使用者中作为其保留容量的思想来工作。从配置了分布式
max.poll.records
值的所有优先级级别主题使用者中按顺序获取记录。分配必须向更高的优先级预留更高的容量或处理速率。
注意1-如果我们在优先级级别主题中有倾斜的分区,例如10K条记录在优先级2分区中,100条记录在优先级1分区中,10条记录在优先级0分区中,这些分区被分配给不同的使用者线程,则实现将无法跨这些使用者同步以调节容量,因此将无法遵守优先级。因此,生产者必须确保没有倾斜的分区(例如使用轮询 - 这可能意味着没有消息排序假设,使用者可以通过分离提取和处理关注点并行处理记录)。
注意2- 如果我们在优先级主题中有空分区,例如分配的2和1分区中没有挂起的记录,但是分配给相同消费者线程的优先级0分区中有10k个记录,则希望优先级0主题分区的消费者能够使其容量达到
max.poll.records
并不受限于基于
maxPollRecordsDistributor
的保留容量,否则整体容量将被浪费。
此实现将尝试解决上述注意事项。每个消费者对象将具有独立的优先级级别主题消费者,每个优先级级别消费者根据maxPollRecordsDistributor拥有基于预留容量的最大轮询记录。只要所有以下内容都为真,每个优先级级别主题消费者就会尝试突破到组中其他优先级级别主题消费者的容量:
它有资格进行扩展-这是如果在
poll()
的最后
max.poll.history.window.size
次尝试中,至少
min.poll.window.maxout.threshold
次接收到等于分配的max.poll.records的记录数,该记录数是基于maxPollRecordsDistributor分配的。这表明该分区有更多的传入记录要处理。
更高的优先级级别不能扩展-没有更高优先级级别的主题消费者根据上述逻辑有资格扩展。基本上给更高的优先级让路。
如果上述内容为真,则优先级级别主题消费者将突破到所有其他优先级级别主题消费者的容量。每个优先级级别主题消费者的突破量等于
poll()
的最后
max.poll.history.window.size
次尝试中最小未使用容量。