向Kafka DLQ发送错误消息时出现ClassCastException异常

3

我正在使用Spring Cloud Streams与Kafka。我的监听器代码如下:

    @Bean
    public Consumer<List<Map<String, Object>>> receive() throws MyException {
        return message -> {
            try {
                messageService.processMessage(message);
            } catch (MyException e) {
                throw new MyException(e.getMessage());
            }
        };
    }

最近我将其更改为批量模式消耗消息。如果它失败,我需要将消息发送到DLQ主题。目前它只是转到错误通道。

    @StreamListener("errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        log.error("exception occurred. errorMessage = {}", errorMessage);
    }

但是我得到了一个 ClassCastException,并伴随着错误跟踪:

2020-04-02 15:34:21.841 ERROR 22222 --- [container-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app'), failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking MessageListener#receive[1 args]; nested exception is com.corelogic.idap.idappipelinegenericelasticsink.exception.ElasticClientException: forced, failedMessage=GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}], headers={kafka_data=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], id=0cb01a1e-fd22-f409-bcbc-1f86880efeec, sourceData=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], timestamp=1585856060836}] for original GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1777) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:1501) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1383) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1254) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1007) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:483) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:217) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:201) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:527) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:497) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1475) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1370) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    ... 8 common frames omitted
Caused by: java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$8(KafkaMessageChannelBinder.java:1057) ~[spring-cloud-stream-binder-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

我的配置看起来像这样

spring.cloud.stream.bindings:
  input:
    destination: test
    group: ${application.group}
    consumer:
      max-attempts: 5
      batch-mode: true
  output:
    contentType: application/json

spring.cloud.stream.kafka.bindings:
  input:
    consumer:
      enableDlq: true
      dlqName: dead-topic
      autoCommitOnError: true
      autoCommitOffset: true
      republish-to-dlq: true
      dlqProducerProperties:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
          value.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

这里有一个类似的问题:Kafka Consumer- ClassCastException java,但它并没有提供太多帮助。
我是否需要像这个问题中所做的那样编写自己的ListSerdeKafka Streams API中ArrayList Serde的问题,或者等待标准的ListSerde PR完成?
1个回答

1
DLQ目前不支持批量监听器;请在GitHub上打开一个关于spring-cloud-stream-binder-kafka的问题。

需要注意的是,框架并不知道批次中哪个记录失败了,因此批次中的所有记录都会被发送到那里,即使有些记录已经成功处理。

通常最好在批处理模式下的监听器中处理错误。


是的。我一直在想批处理如何处理失败和成功记录的混合情况。这是我想要研究的下一个问题。但无论如何,在批处理模式下处理错误是有意义的。我将在spring-cloud-stream-binder-kafka中打开一个问题。谢谢! - afzal
1
@afzal,如果有的话,你能提供一下这个问题的链接吗?看起来我也遇到了同样的错误 - 批量消费者和死信队列。谢谢。 - daoway
@daoway 我并没有为此创建问题。我最终手动重试,并手动将失败的消息放入了死信队列。 - afzal
1
我今天打开了这个问题:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1206 - Sébastien Nussbaumer
@SébastienNussbaumer 那个问题已经关闭并被这个替代:https://github.com/spring-cloud/spring-cloud-stream/issues/2317 - Emmanuel
此功能现在已经在Spring Cloud Stream中提供。请参阅此链接以获取详细信息 - https://github.com/spring-cloud/spring-cloud-stream/issues/2317#issuecomment-1428716481 - sobychacko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接