我目前正在处理的项目中,我们有四个基于Spring的应用程序,它们从各种Kafka主题中读写数据。由于发送的数据量很大,所以我们决定启用LZ4压缩。这对三个应用程序完全没有问题,但对第四个应用程序造成了巨大的问题。
流程如下:应用程序A接收HTTP请求并将有效负载转换为Kafka消息。应用程序B从应用程序A读取消息,对其进行一些处理,并将其发送到两个不同的主题以供应用程序C和D使用。我们通过将压缩类型配置设置为lz4在A和B的生产者上启用了压缩。然后,我们进行了测试,消息从A到B以及从B到D都可以正常工作。然而,当应用程序C接收到压缩消息时,以下错误开始被不断地记录:
java.lang.IllegalAccessError: failed to access class net.jpountz.lz4.LZ4SafeUtils (class loader System@7452) from class net.jpountz.lz4.LZ4JavaSafeSafeDecompressor (class loader org.springframework.boot.loader.LaunchedURLClassLoader@11104)
我对类加载器不太熟悉,但据我所知,虽然它们属于同一包,但似乎由不同的类加载器加载,导致访问错误?
正在使用的 spring-kafka 版本为 2.7.3,lz4-java 版本为 1.7.1,作为 spring-kafka 的依赖项导入。
我不明白为什么这个问题只在其中一个应用程序上发生,而其他应用程序却没有。因为所有配置都相同(通过一个被所有四个应用程序使用来创建所需 bean 的公共库进行共享和加载)。最初,在出现此问题的应用程序中,我们还导入了另一个库,该库使用了修改过的 lz4-java 版本,因此我们认为问题是从那里引起的。但是,即使完全删除它,仍会收到上述相同的错误。
有人遇到过类似的问题或者可以给我提示,下一步该怎么做来解决这个问题吗?
谢谢。
编辑:应用程序正在云中运行,这是在应用程序日志中看到的堆栈跟踪。以下块一直重复发送,直到关闭或从正在监听的主题中删除压缩的消息为止。
[container-1-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.KafkaException: Received exception when fetching the next record from ETD-NewUserContextSystemData.ibsoetdcfsubacc4.eb419dde-795b-48a2-977a-8117ac15cb4e-2. If needed, please seek past the record to continue consumption.
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1603) [spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) [spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_301]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_301]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at java.lang.Thread.run(Thread.java:836) [?:1.8.0_301]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT Caused by: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from ETD-NewUserContextSystemData.ibsoetdcfsubacc4.eb419dde-795b-48a2-977a-8117ac15cb4e-2. If needed, please seek past the record to continue consumption.
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1611) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT ... 3 more
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalAccessError: failed to access class net.jpountz.lz4.LZ4SafeUtils (class loader System@7452) from class net.jpountz.lz4.LZ4JavaSafeSafeDecompressor (class loader org.springframework.boot.loader.LaunchedURLClassLoader@11104)
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:346) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1554) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1591) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT ... 3 more
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT Caused by: java.lang.IllegalAccessError: failed to access class net.jpountz.lz4.LZ4SafeUtils (class loader System@7452) from class net.jpountz.lz4.LZ4JavaSafeSafeDecompressor (class loader org.springframework.boot.loader.LaunchedURLClassLoader@11104)
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4JavaSafeSafeDecompressor.decompress(LZ4JavaSafeSafeDecompressor.java:71) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4SafeDecompressor.decompress(LZ4SafeDecompressor.java:74) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4Factory.<init>(LZ4Factory.java:214) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4Factory.safeInstance(LZ4Factory.java:105) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4Factory.fastestJavaInstance(LZ4Factory.java:141) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:169) ~[lz4-java-1.7.1.jar:2.9.16-38a800d98fb18a51ecde14f570bbf28d81b66bbc]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<clinit>(KafkaLZ4BlockInputStream.java:50) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:346) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1554) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1591) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.2.jar:?]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar:2.7.3]
2021-09-24T10:42:29.30+0300 [APP/PROC/WEB/1] OUT ... 3 more