一个Kafka消费者是否可以在从主题中轮询所有消息之前过滤消息?

20

据说消费者只能读取整个主题。无法在经纪人上执行评估以过滤消息。

这意味着我们必须从主题中接收/消费所有消息,并在客户端过滤它们。

这太过繁琐了。我在想是否可以基于已传递给经纪人的某些信息(例如消息键或其他内容)来过滤和接收特定类型的消息。

从方法Consumer.poll(timeout)来看,似乎没有额外的操作。


你的主题应该只包含一种类型的消息。重点是,如果在将数据插入到单独的主题时进行过滤,那么从每个主题消耗数据应该非常简单。 - Infamous
如果消费者数量很大,比如一百万,并且Kafka被用作消费者之间通信的管道,那么为每个消费者创建一个特定主题就不是一个解决方案了,对吧? - mikej1688
不。我对消费者的理解是,消费者是愚笨的;这意味着消费者只是听取特定主题并处理Kafka队列中出现的任何内容。一旦你开始将Kafka作为这些所谓“消费者”之间的“通信管道”,你就会遇到麻烦。如果您想要管理数百万个组件之间的通信,则需要一个ESB。 - Infamous
抱歉,什么是ESB? - mikej1688
从我的角度来看,原始问题已经在下面的回答中得到了解答。请标记正确的答案或提供反馈,说明为什么没有得到解答。谢谢! :) - dbustosp
4个回答

8

不,使用消费者时不仅可以从主题接收某些消息。消费者按顺序获取所有消息。

如果您不想在消费者中筛选消息,则可以使用流式作业。例如,Streams将从您的主题中读取,并仅将消费者感兴趣的消息推送到另一个主题中。然后消费者可以订阅此新主题。


3
根据我的理解,即使使用流,数据仍将被传输到客户端。 - dbustosp
不确定你的流程是什么样子的,但在某些情况下,您可以在相对便宜的集群附近运行Streams应用程序以消费和写回Kafka。 - Mickael Maison
不确定如何支持,如果您能给我一个例子,我将不胜感激 :) - dbustosp
谢谢。Kafka支持发布/订阅模型。我的原始目标是利用其高吞吐量,同时将其作为消费者(即客户端)之间通信的管道。这可能不是使用Kafka的正确方式。 - mikej1688
https://issues.apache.org/jira/browse/KAFKA-6020 - Nikolay Dimitrov

4
每个Kafka主题应该包含逻辑上相似的消息,以保持主题一致性。有时候你可能会遇到一个主题,比如说水果,其中包含水果的不同属性(可能是json格式)。生产者可能会推送不同的水果消息,但是你希望其中一个消费者组只处理苹果。理想情况下,你可能会选择用具体的水果名称作为主题名,但是假设由于某些原因(比如太多主题),这种方法行不通。在这种情况下,你可以覆盖Kafka中的默认分区方案,忽略key并进行随机分区,然后通过生产者中的partitioner.class属性传递自定义的分区器类,把水果名称放在消息key中。这是必需的,因为默认情况下,如果你在发送消息时输入了key,它将永远进入同一个分区,这可能会导致分区不平衡。
这背后的思想是,有时候如果你的Kafka消息值是一个复杂对象(json、avro记录等),基于key过滤记录可能比解析整个值并提取所需字段更快。我现在没有任何数据来支持这种方法的性能优势,只是一种直觉。

也许一个点对点的消息传递应用会比Kafka更加合适。 - mikej1688

2

一旦记录被推送到Kafka集群中,你就无法做太多事情了。无论你想要过滤什么,你都必须将数据块带到客户端。

不幸的是,唯一的选择是将该逻辑传递给生产者,这样你就可以根据你定义的特定逻辑将数据推送到多个主题中。


同意。一旦订阅了主题,消费者将接受主题中的所有内容,并在消费者端进行过滤处理。这种方法的缺点是浪费网络带宽。 - mikej1688
@mikej1688 在消费者端你无能为力。但是你可以采用其他策略来节省网络带宽。 - dbustosp
Solace消息应用程序似乎可以做更多的自定义内容。这是他们的口号,“消息路由、过滤和排序”。 - mikej1688

0
Kafka Consumer会接收来自主题的所有消息。但是,如果有任何自定义的消息类型(MyMessage)只需要被消费,那么可以在反序列化类中进行过滤。如果消费者收到了两种类型的消息,比如字符串和MyMessage,那么字符串类型的消息将被忽略,只处理MyMessage类型的消息。
public class MyMessageDeserializer implements Deserializer<MyMessage> {

@Override
public MyMessage deserialize(String topic, byte[] data) {
    try {
        if (data == null){
            logger.info("Null received at deserializing");
            return null;
        }
        return objectMapper.readValue(new String(data, "UTF-8"), MyMessage.class);
    } catch (Exception e) {
        logger.error("Deserialization exception: " + e.getMessage());
    }
    return null;
}
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接