此问题现在已在Message Hub上解决。
我在使用Kafka创建KTable时遇到了一些麻烦。我是Kafka的新手,这可能是我的问题根源,但我仍然想在这里寻求帮助。我的项目需要通过计算总出现次数来跟踪不同ID。我正在使用IBM Cloud上的Message Hub来管理我的主题,目前为止它运行得非常好。
我在Message Hub上有一个主题,它会生成像{"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}
这样的消息,目前唯一相关的键是ID。
我的Kafka代码以及Streams配置如下:
import org.apache.kafka.streams.StreamsConfig;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> Kstreams = builder.stream(myTopic);
KTable<String, Long> eventCount = Kstreams
.flatMapValues(value -> getID(value)) //function that retrieves the ID
.groupBy((key, value) -> value)
.count();
当我运行代码时,我遇到了以下错误:
“KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition”话题创建失败。
导致该错误的原因是: 无法创建主题,因为配置文件中包含不允许的参数。只有以下参数是允许的:[retention.ms, cleanup.policy]。
我不知道这个错误出现的原因,也不知道应该怎么办。我的KStream和KTable是否构建有误?还是bluemix上的消息中心有问题?
解决方法如下:
从下面正确答案的评论中提取了一些内容。事实证明,我的StreamsConfig没有问题,而且目前在Message Hub方面存在问题,但有一个解决方法:
原来,Message Hub在使用Kafka Streams 1.1创建repartition topics时存在问题。在我们修复此问题之前,您需要手动创建主题“KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition”。它需要与您的输入主题(myTopic)具有相同数量的分区,并将保留时间设置为最大值。一旦修复,我会再发一条评论。
非常感谢您的帮助!