Spring Cloud @StreamListener条件已弃用,有什么替代方案?

5
我们有多个应用程序同时监听同一个Kafka主题,生产者在向主题发送消息时会设置消息头,以便特定的实例能够评估消息头并处理消息。例如:
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}

在Spring Cloud Stream 3.0.0中,@StreamListener已经被废弃,而在Function中我找不到与condition属性相对应的等效选项。有任何建议吗?
1个回答

1
尽管我也无法找到函数式方法的等效项,但我有一个建议。@StreamListener注释的条件并不能阻止应用程序必须消耗消息、读取其标头并过滤特定记录,然后将其传递给侦听器(fullfillOrder())。因此,可以安全地假设您正在消耗击中主题的每个消息(由Spring Cloud在幕后为我们实现的事件接收器),但是只有在标头==悉尼时才执行侦听器。如果有一种方法可以配置Spring Cloud使用的事件接收器(在命中侦听器之前丢弃消息),我建议研究一下。如果没有,就会在进行任何处理之前过滤掉任何消息(非悉尼)。如果您熟悉Spring Cloud的功能方法,它看起来像这样:
@Bean
public Consumer<Message<TestObj>> fulfillOrder() {
    return msg -> {
        // to get header - msg.getHeaders().get(key, valueType);
        // filter out bad messages
    }
}

或者

@Bean
public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
    return msg -> {
        // msg.headers().lastHeader("franchiseName").value() -> filter em out
    }
}

其他: ^ 我的代码假设您正在使用spring-cloud-stream-binder-kafka将kafka-client API与Spring Cloud Stream集成。根据列出的标签,我会注意到Spring Cloud Stream有两个版本的Kafka绑定器-一个用于kafka客户端库,另一个用于kafka流库。

不考虑Spring Cloud / Frameworks,在kafka streams中的高级DSL不提供对标头的访问,但低级处理器API可以。从示例中可以看出,您正在利用客户端绑定器而不是spring-cloud-stream-binder-kafka-streams / kafka流绑定器。我还没有看到使用低级处理器API的spring cloud stream + kafka streams binder的实现,因此无法确定是否达到了目标。


另一个选项:在消费者和代理之间注入拦截器。如果您的逻辑返回不符合标头要求的任何记录的null,则.intercept()方法将无法将记录传递给消费者,因此这将非常有用。https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/listener/RecordInterceptor.html - Nerm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接