据说消费者只能读取整个主题。无法在经纪人上执行评估以过滤消息。
这意味着我们必须从主题中接收/消费所有消息,并在客户端过滤它们。
这太过繁琐了。我在想是否可以基于已传递给经纪人的某些信息(例如消息键或其他内容)来过滤和接收特定类型的消息。
从方法Consumer.poll(timeout)来看,似乎没有额外的操作。
据说消费者只能读取整个主题。无法在经纪人上执行评估以过滤消息。
这意味着我们必须从主题中接收/消费所有消息,并在客户端过滤它们。
这太过繁琐了。我在想是否可以基于已传递给经纪人的某些信息(例如消息键或其他内容)来过滤和接收特定类型的消息。
从方法Consumer.poll(timeout)来看,似乎没有额外的操作。
不,使用消费者时不仅可以从主题接收某些消息。消费者按顺序获取所有消息。
如果您不想在消费者中筛选消息,则可以使用流式作业。例如,Streams将从您的主题中读取,并仅将消费者感兴趣的消息推送到另一个主题中。然后消费者可以订阅此新主题。
一旦记录被推送到Kafka集群中,你就无法做太多事情了。无论你想要过滤什么,你都必须将数据块带到客户端。
不幸的是,唯一的选择是将该逻辑传递给生产者,这样你就可以根据你定义的特定逻辑将数据推送到多个主题中。
public class MyMessageDeserializer implements Deserializer<MyMessage> {
@Override
public MyMessage deserialize(String topic, byte[] data) {
try {
if (data == null){
logger.info("Null received at deserializing");
return null;
}
return objectMapper.readValue(new String(data, "UTF-8"), MyMessage.class);
} catch (Exception e) {
logger.error("Deserialization exception: " + e.getMessage());
}
return null;
}
}