我们有多个应用程序同时监听同一个Kafka主题,生产者在向主题发送消息时会设置消息头,以便特定的实例能够评估消息头并处理消息。例如:
在Spring Cloud Stream 3.0.0中,@StreamListener已经被废弃,而在Function中我找不到与condition属性相对应的等效选项。有任何建议吗?
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
log.info("sydney order request received message is {}",message.getName());
}
在Spring Cloud Stream 3.0.0中,@StreamListener已经被废弃,而在Function中我找不到与condition属性相对应的等效选项。有任何建议吗?
.intercept()
方法将无法将记录传递给消费者,因此这将非常有用。https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/listener/RecordInterceptor.html - Nerm