Kafka Streams 内部主题命名

6
根据文档 (https://docs.confluent.io/current/streams/developer-guide/manage-topics.html#internal-topics),内部主题遵循命名规则 <application.id>-<operatorName>-<suffix>
以下是一些示例:
testapplication-KSTREAM-REDUCE-STATE-STORE-0000000008-repartition  
testapplication-KSTREAM-REDUCE-STATE-STORE-0000000027-repartition  

有人知道整数是如何确定的吗?

不幸的是,我们的安全要求不允许我们使用应用程序创建主题,需要提前设置。我正在尝试确定这些主题名称是否一致。


这是一个实现细节,您不应该依赖于这些名称的生成方式。在内部,它只是一个计数器,每次生成新名称时都会增加。 - Matthias J. Sax
如果这是一个问题,你也可以命名某些运算符/存储器来在你的代码中设置名称。 - Matthias J. Sax
你怎么做到的?我们正在使用DSL API,但看不到在reduce操作中命名主题的方法。如果您能编写一个描述如何实现的答案,那就太好了! - Chris
1
所有有状态的操作符都有重载,可以传入可选参数,如MaterializedJoined,允许指定名称:如果您能阅读文档,那就太好了:https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized- - Matthias J. Sax
我已经阅读了文档!我不想创建一个新的状态存储并将其放在我的文件系统中,只是为了能够命名一个主题。 - Chris
显示剩余5条评论
5个回答

2
通常,中间主题的名称是按照以下约定构建的:
<ApplicationId>-<operator name>-<suffix>

后缀值可以是"changelog"或"repartition"

根据操作符,它使用其中一个后缀。以下是一个例子:

testapplication-aggregate-repartition

testapplication-aggregate-changelog


所以我们有一些类似于testapplication-KSTREAM-REDUCE-STATE-STORE-0000000008-repartition和testapplication-KSTREAM-REDUCE-STATE-STORE-0000000027-repartition的东西。这些数字是从哪里来的? - Chris
1
这些数字代表着-KSTREAM-REDUCE-STATE-STORE中的处理器节点。拓扑结构由许多处理器节点构建,每个节点都有唯一的ID。因此,默认状态存储命名约定使用这些名称。 - Nishu Tayal
通过查看这些主题名称,我能推断出流处理的顺序吗? - Ihor M.

1

关于整数的主要问题,根据文档,我找到了以下内容:

数字是一个全局递增的数字,表示拓扑中操作者的顺序。生成的数字以变化数量的“0”为前缀,以创建一个始终为10个字符长的字符串。

这是kafka流DSL的一个非常重要的方面,如果您更改拓扑结构可能会导致一些问题。给您的状态运算符命名是一个好习惯。

有关更多信息,请参见 dsl-topology-naming 文章。


0
关于主题名称是否一致的问题,根据我的经验,在应用程序的执行过程中它们是一致的,但是如果您在逻辑中修改连接或减少任何连接或减少的顺序,那么主题名称可能会发生变化。

0

你看过设置ACL给Streams内部主题的这些命令吗?我相信它们是作为Kafka v2.x.x的一部分引入的(confluent doc

   # Allow Streams to manage its own internal topics and consumer groups:
   bin/kafka-acls ... --add --allow-principal User:team1 --operation All --resource- 
   pattern-type prefixed --topic team1-streams-app1 --group team1-streams-app1

所以你只需要知道流应用程序的ID,它是所有内部主题的前缀。

我相信,如果你授予权限ALL,那么这也将允许它们的创建。


-1

整数是内部生成的。

您可以在此处找到文档:

https://docs.confluent.io/current/streams/javadocs/index.html

在groupBy方法的描述中,它说:

因为选择了一个新的键,所以将在Kafka中创建一个内部重新分配主题。此主题将被命名为"${applicationId}-XXX-repartition",其中"applicationId"是通过StreamsConfig参数APPLICATION_ID_CONFIG由用户指定的,"XXX"是一个内部生成的名称,而"-repartition"是一个固定的后缀。您可以通过Topology.describe()检索所有生成的内部主题名称。


这并没有真正回答问题。整数是内部计数器,被视为实现细节。 - Matthias J. Sax
@MatthiasJ.Sax 您是正确的。我知道这是一个实现细节。不幸的是,我们的安全要求不允许我们使用应用程序创建主题,并且需要提前设置。我正在尝试确定这些主题名称是否会保持一致。 - Chris
@MatthiasJ.Sax他编辑了原始问题,但并没有要求实现细节。 - sowieso-fruehling
只要您不更改拓扑结构,名称就不会改变。您可以通过 Topology#describe() 找到名称。@Chris - Matthias J. Sax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接