我是kafka的新手。我的需求是,我有两个分区,例如Partition-0和Partition-1,我有一个值列表,其中还包含KEY值。我想按照我的键存储数据,例如key-1将进入Partition-0,key-2将进入Partition-1。使用旧的API可以通过实现Partition接口来实现,但我如何在新的API中做到这一点。谢谢。
我是kafka的新手。我的需求是,我有两个分区,例如Partition-0和Partition-1,我有一个值列表,其中还包含KEY值。我想按照我的键存储数据,例如key-1将进入Partition-0,key-2将进入Partition-1。使用旧的API可以通过实现Partition接口来实现,但我如何在新的API中做到这一点。谢谢。
使用新的生产者,您还可以实现 Partitioner
接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java),以实现轮询分发。
您可以使用 DefaultPartitioner
作为参考 - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
DefaultPartitioner
将为您完成工作。您不需要编写自定义实现。从javadoc中可以看到:/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
从Kafka 2.4.0开始,您可以选择进行“始终”循环。
你可以通过覆盖kafka生产者的默认分区器 以轮询的方式向kafka生产。
一个伪代码实现:
class RRPartitioner():
def __init__():
# Using topic metadata get total number of partitions
self.total_partitions = client[topic].get_number_partitions()
self.part_offset = 0
def partitioner(self, key, msg):
if self.part_offset > self.total_partitions:
self.part_offset = 0
return self.part_offset
else:
self.part_offset += 1
return self.part_offset