理解 Kafka 生产者中的 max.inflight 属性

20

我正在使用Kafka 1.0.0-cp1版本的群集。

在我专注于最大吞吐量、有序保证和没有数据丢失的负载测试中(仅涉及一个分区的主题),我是否需要将max.in.flight.requests.per.connection属性设置为1

我已经阅读了这篇文章

我理解,只有当我在生产者端启用重试功能,并使用retries属性时,才需要将max.in.flight设置为1。

另外一个问法:仅有一个分区+ retries=0(生产者属性)就足以保证Kafka的有序性吗?

我需要知道这一点,因为增加max.in.flight会极大地提高吞吐量。


如果您查看底部的注释,您将看到从max.in.flight.requests.per.connection更正为max.in.flight.requests.per.session - Jayendran
3个回答

26

您的使用场景有些不清楚。您提到了顺序性和无数据丢失,但没有说明是否容忍重复消息。因此,不确定您想要 至少一次(QoS 1)还是 恰好一次

无论如何,由于您正在使用1.0.0版本并仅使用单个分区,建议您查看幂等生产者而不是调整生产者配置。它可以有效地保证顺序性和无数据丢失。

从文档中可以得知:

幂等交付确保消息在单个生产者的生命周期内恰好传递一次到特定的主题分区。

早期的幂等生产者强制将 max.in.flight.requests.per.connection 设置为1(出于您提到的相同原因),但在最新版本中,它现在可以使用 max.in.flight.requests.per.connection 设置为高达5,并仍然保持其保证。

使用幂等生产者,您不仅会获得更强的传递语义(恰好一次而不是至少一次),甚至可能表现更佳!

我建议您检查 [文档中的] 传递语义。

[文档中的]:http://kafka.apache.org/documentation/#semantics


回到您的问题

是的,在没有幂等(或事务性)生产者的情况下,如果您想要避免数据丢失(QoS 1)并保留顺序,必须将 max.in.flight.requests.per.connection 设置为1,允许 retries 并使用 acks=all。但这会带来显着的性能损失。


8
如果您想保持消息顺序,我不确定您是否可以将max.in.flight.requests.per.connection设置为5。如果一个消息被拒绝并需要重试,但在此期间第二个消息已经被发送并写入主题,那么第一个消息将在第二个消息之后写入主题。为避免这种情况,您只能有一条消息在传输中。如果您只想要确切一次交付,我了解您可以将此属性设置为5。 - Pablo Antequera
2
@PabloAntequera:我不知道之前的版本,但是在最近的Kafka版本中,启用enable.idempotence=true可以防止重复消息,并保持消息的顺序,即使max.in.flight.requests.per.connection > 1。详细解释请参见:https://docs.confluent.io/cloud/current/client-apps/optimizing/durability.html#duplication-and-ordering - Sarye Haddadi
发送到不同分区的每个连接时,它是否保留发送顺序?我的意思是,如果一个发送失败了,后面的消息是否有可能成功。 - Jimmy Guo
如果不需要幂等性,那么在生产者中将最大并发设置为1。否则将幂等性设置为true。实际上,将幂等性设置为true应该比将最大并发设置为1更高效,并且它还可以免费提供幂等性。我认为原因是,代理知道生产者正在进行幂等会话,所以有一个顺序号。如果它发现缺少序列号,它会通知生产者并停止处理消息。生产者然后暂时切换到最大并发= 1。 - Apurva Singh

6
是的,您必须将max.in.flight.requests.per.connection属性设置为1。 在您阅读的文章中,作者最初犯了一个错误(目前已更正),其中写道:

max.in.flights.requests.per.session

这在Kafka文档中不存在。
这个勘误可能来自于书籍《Kafka权威指南》(第一版),您可以在第52页上读到以下内容:

<...因此,如果保证顺序很重要,我们建议将in.flight.requests.per.session=1设置为确保在批处理消息正在重试时,不会发送其他消息...>


6
我认为,了解这个问题也是非常宝贵的,因为它使事情变得更有趣,但稍微复杂了一些。
当启用enable.idempotence=true时,每次向代理发送消息时,都会附带一个从零开始的序列号。代理也会在其端存储该序列号。当您向代理发起下一个请求(例如使用sequence_id=3),则代理可以查看其当前存储的序列号并执行以下操作:
  • 如果其为4,则表示是新的记录批次。
  • 如果其为3,则表示为重复记录。
  • 如果其为5或更高,则表示丢失消息。
现在考虑max.inflight.requests.per.connection。生产者可以同时发送多达此值并行请求,并且无需等待代理的答复。当我们达到3(假设max.inflight.requests.per.connection=3)时,我们开始向代理请求先前的结果(同时即使准备好了批次,我们也不能处理任何批次)。
现在,仅出于例子的目的,假设代理说:“1很好,我存储了它”,“2失败了”,现在重要的部分是:因为2失败了,您只能得到“顺序错误”的3,这意味着它没有存储。客户端现在知道需要重新处理23,并创建一个列表并按照确切的顺序重新发送它们——如果启用重试。
虽然这个解释可能过于简单化,但这是我在阅读源代码后的基本理解。

我的理解与你的一样,基于这个链接:https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#enable-idempotence,不过 OP 的问题是针对一个旧版本的 Kafka。 - keemahs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接