Kafka:如何在Kafka中实现轮询分区?

3

我是kafka的新手。我的需求是,我有两个分区,例如Partition-0和Partition-1,我有一个值列表,其中还包含KEY值。我想按照我的键存储数据,例如key-1将进入Partition-0,key-2将进入Partition-1。使用旧的API可以通过实现Partition接口来实现,但我如何在新的API中做到这一点。谢谢。

4个回答


3
如果您想要循环调度行为,请在向生产者写入时不传递键,DefaultPartitioner将为您完成工作。您不需要编写自定义实现。从javadoc中可以看到:
/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */

0

@AlpcanYıldız 不确定您所说的Kafka流属性是什么意思,但它是一种以生产者为中心的配置。您可以阅读有关KafkaStreams的信息,该信息表明它在底层使用普通的生产者来编写处理后的流数据-https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java - ha9u63a7
我想问一下,在kafka stream属性中,我可以使用默认的分区器吗?我知道kafka streams使用默认的普通分区器。例如,我可以这样说:props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, streamingConfig.getStandbyReplicas()),因为它是一个StreamsConfig。但是我可以这样说吗:props.put(Pro.NUM_STANDBY_REPLICAS_CONFIG, streamingConfig.getStandbyReplicas())?我能使用这个吗?props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class)。 - Alpcan Yıldız
请问您能否检查一下我的问题?因为Matthias说我不能使用这个配置,但实际上我可以(https://stackoverflow.com/questions/59645127/kafka-streams-roundrobinpartitioner/59658438?noredirect=1#comment105539367_59658438)。如果我在2.4客户端和2.2版本的kafka上使用RoundRobinPartitioner,生产者将使用roundrobin设置,但我无法使用所有内部主题的分区。它将我的消息分配到大约30或40个分区中的50个分区。 - Alpcan Yıldız
@AlpcanYıldız Matthias 的意思是 - 你不能直接使用它。我建议你检查 DefaultStreamPartitioner 类,并查看它如何在内部使用 DefaultPartitioner 实例。你的自定义实现将简单地实现 StreamPartitioner 并在内部使用 RoundRobinPartitioner 来完成它。一个警告 - 要注意排序。此外,他在第二段确切地提到了这一点。底线 - 你不能通过 Producer.properties 直接使用它。 - ha9u63a7
如果我不能像这样使用with,你知道我该如何实现这一点,以便我的kafka-dsl拓扑在所有内部主题中使用轮询分区器吗? - Alpcan Yıldız

0

你可以通过覆盖kafka生产者的默认分区器 以轮询的方式向kafka生产。

一个伪代码实现:

class RRPartitioner():
      def __init__():
            # Using topic metadata get total number of partitions
            self.total_partitions = client[topic].get_number_partitions()
            self.part_offset = 0

      def partitioner(self, key, msg):
          if self.part_offset > self.total_partitions:
              self.part_offset = 0
              return self.part_offset
          else:
              self.part_offset += 1
              return self.part_offset

以上实现是纯轮询,如果您想按键排序并进行轮询,则需要在自定义分区器中进行更多操作。

这是最简单的解决方案,但如果您在运行时添加分区,则无法使用该解决方案,这是完全有效的情况。 - serejja
是的,您需要定期重新启动生产者或轮询以获取元数据更改。但是,大多数现有的键控生产者都会遇到同样的问题,如果我没记错的话。 - Sudev Ambadi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接