Kafka Streams在Message Hub上的KTable配置错误

7

此问题现在已在Message Hub上解决。

我在使用Kafka创建KTable时遇到了一些麻烦。我是Kafka的新手,这可能是我的问题根源,但我仍然想在这里寻求帮助。我的项目需要通过计算总出现次数来跟踪不同ID。我正在使用IBM Cloud上的Message Hub来管理我的主题,目前为止它运行得非常好。

我在Message Hub上有一个主题,它会生成像{"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}这样的消息,目前唯一相关的键是ID。

我的Kafka代码以及Streams配置如下:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

当我运行代码时,我遇到了以下错误:
“KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition”话题创建失败。
导致该错误的原因是: 无法创建主题,因为配置文件中包含不允许的参数。只有以下参数是允许的:[retention.ms, cleanup.policy]。
我不知道这个错误出现的原因,也不知道应该怎么办。我的KStream和KTable是否构建有误?还是bluemix上的消息中心有问题?
解决方法如下:
从下面正确答案的评论中提取了一些内容。事实证明,我的StreamsConfig没有问题,而且目前在Message Hub方面存在问题,但有一个解决方法:
原来,Message Hub在使用Kafka Streams 1.1创建repartition topics时存在问题。在我们修复此问题之前,您需要手动创建主题“KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition”。它需要与您的输入主题(myTopic)具有相同数量的分区,并将保留时间设置为最大值。一旦修复,我会再发一条评论。
非常感谢您的帮助!

你能添加你的Kafka Streams应用程序配置(或属性文件)吗? - Mickael Maison
1个回答

5

创建主题时,Message Hub有一些限制条件。详见此处

从你收到的PolicyViolationException异常来看,你的Streams应用程序似乎尝试使用了我们不允许的一些配置:

  • segment.index.bytes
  • segment.bytes
  • segment.ms

我猜你在Streams配置中设置了这些参数,需要将其删除。

请注意,在配置文件中,还需将StreamsConfig.REPLICATION_FACTOR_CONFIG设置为3,以便与Message Hub配合使用。如我们文档所述。


谢谢您的回复!我认为您是对的。我已经添加了我的流配置,但我认为使其与Message Hub(MH)配合工作所需的必要条件已经就位。至少根据文档是这样的。我不明白的是,为什么我会收到“无法创建主题”的消息,除非KTable算作一个主题?是否可以通过在MH上分配主题来解决这个问题?正如我在问题开头写的那样,我想计算来自MH主题中ID的出现次数,这是否与MH有更多关系,而不仅仅是监听主题? - jawwe
是的,您的转换逻辑将创建“内部”主题,请参见http://kafka.apache.org/11/documentation/streams/developer-guide/manage-topics.html#streams-developer-guide-topics-internal。您可以提前手动预先创建它们,但通常更容易让Streams自己处理。 否则,我认为您的逻辑看起来很好。 - Mickael Maison
我明白了。当我尝试创建一个KTable时,由于我受到Message Hub的限制,所以出现了错误。如果我的代码是从MH主题检索消息并尝试在MH上创建一个内部主题(KTable),那么我理解得对吗?是否有一种方法可以在流中修改我的代码以在其他地方创建KTable?还是我需要另外一个Kafka服务器才能实现我想要的效果?MH能处理KTables吗?非常感谢您的帮助,抱歉问题这么多。 - jawwe
1
我成功地重现了你的问题。原来在使用Kafka Streams 1.1创建重新分区主题时,Message Hub存在问题。在我们修复此问题之前,您需要手动创建名为KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition的主题。它需要与输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。一旦问题解决,我会再次发表评论。 - Mickael Maison
1
我完全忘记在这里发布了,但这个问题几周前已经修复了。 - Mickael Maison

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接