Kafka Confluent InvalidConfigurationException: Unauthorized; error code: 401 卡夫卡Confluent无效配置异常:未授权;错误代码:401

4

我正在使用kafka Confluent schema registry中的SpecificAvroSerde读取模式,但是我遇到了以下错误:

org.apache.kafka.common.errors.InvalidConfigurationException: Unauthorized; error code: 401
[TestStreamProcess-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [TestStreamProcess] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unauthorized; error code: 401

我正在从config.properties文件中加载所需的配置,以成功连接到模式注册表:

schema.registry.url=<confluent Schema Registry URL>
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<Schema Registry API Key>:<schema Registry Secret Key>

serdesConfig已经设置好并且需要的导入已经完成:

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.*;

    public static SpecificAvroSerde<TestStream> getTestStreamSerde(Properties props) {
        SpecificAvroSerde<TestStream> testStreamSerde= new SpecificAvroSerde<>();
        testStreamSerde.configure(getSerdeProps(props), false);
        return testStreamSerde;
    }

  protected static Map<String, String> getSerdeProps(Properties props) {
        final HashMap<String, String> map = new HashMap<>();

        final String schemaUrlConfig = props.getProperty(SCHEMA_REGISTRY_URL_CONFIG);
        map.put(SCHEMA_REGISTRY_URL_CONFIG, ofNullable(schemaUrlConfig).orElse(""));
}

        final KStream<String, TestStream> testStream = builder.stream("input-topic",
                Consumed.with(String(), getTestStreamSerde(props.getProperties())));

在pom.xml中已经加载了Schema Registry artifact:

<plugin>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-maven-plugin</artifactId>
    <version>${confluent.version}</version>
</plugin>

你能帮我确定我的配置缺少了什么吗?

1个回答

0
如果您遇到此身份验证错误,请首先检查是否使用了正确的API密钥。
例如,您可能正在使用集群API密钥和密钥而不是模式注册表密钥和密钥。
集群API密钥和模式注册表API密钥是不同的值,用于在集群级别和模式注册表级别提供访问权限。它们被称为特定于资源 API密钥。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接