增加Kafka Streams消费者吞吐量

7
我有一个 Spark Streaming 应用程序和一个 Kafka Streams 应用程序并行运行,用于基准测试。两者都从相同的输入主题消费并写入不同的目标数据库。输入主题有15个分区,Spark Streaming 和 Kafka Streams 都有15个消费者(1:1比例)。此外,事件负载大约为2kb。不确定是否相关,但是 Spark Streaming 的90%执行时间约为9毫秒。Kafka Streams 为12毫秒。每次处理消息时,在我的 Processor 中调用 commit() 方法。
问题在于高峰值。Spark Streaming 可以跟上每秒700个,而 Kafka Streams 只能达到每秒60/70个。我无法超越这一点。请参见下面的图表:(绿线 - Spark Streaming / 蓝线 - Kafka Streams)

Green Line - Spark Streaming / Blue line - Kafka Streams

根据以下配置,只要每个消费者的事件不超过1000个,考虑到反压,Spark Streaming 可以跟上速度,与每个分区的字节数无关。至于 Kafka Streams,如果我正确理解其配置(请纠正我),基于下面相同的条件,我可以在每 100 毫秒(poll.ms)内获取最多 1000 条记录(max.poll.records),只要它不超过每个分区的 1MB(max.partition.fetch.bytes)和每次提取的 50MB(fetch.max.bytes)。

无论我使用 5、10 还是 15 个消费者,我都看到了相同的结果(每秒卡在 70 个事件),这让我认为这与配置有关。我试图通过增加每次提取的记录数和每个分区的最大字节数来调整它们,但没有得到显著的结果。

我知道这些是不同的技术,用于不同的目的,但我想知道在 Kafka Streams 中应该使用什么值来获得更好的吞吐量。

Spark Streaming 配置:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100

Kafka流配置(所有字节和时间相关)

# Consumer Config
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
heartbeat.interval.ms = 3000 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 1000 
request.timeout.ms = 30000
enable.auto.commit = false

# StreamsConfig
poll.ms=100 

处理器代码
public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String payload) {

        ResponseEntity responseEntity = null;
        try {

          // Do Some processing

        } catch (final MyException e) {

          // Do Some Exception Handling

        } finally {

            context.forward(UUID.randomUUID().toString(), responseEntity);
            context.commit();
        }
    }

提前感谢!


1
不是专家:我认为问题可能在Kafka的提交上。为了确保,我在您发布的配置中没有看到禁用自动提交(enable.auto.commit)。我也很惊讶您消耗了70个事件,而不管消费者的数量...我想知道所有记录是否最终都会进入同一个分区,您能验证一下负载是否在所有分区之间平衡吗?查看代码可能也有帮助!也许还要看一下主题配置。 - Augusto
嗨,Augusto,感谢您的输入。刚刚添加了代码以帮助解决问题。所有分区都与消费者保持良好平衡。是的,自动提交已禁用。如果您想查看其他配置,请告诉我。 - Guilherme Alcântara
2个回答

5

更新

Kafka Streams写入的数据库是这里的瓶颈。在我们将其切换到更好的集群(更好的硬件,内存,核心等)之后,我使用下面的配置进行了优化,并能够每秒消耗约2k个事件。提交间隔配置也已更改(根据Augusto的建议),并使用G1GC垃圾收集器。

fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576 

fetch.max.wait.ms = 1000 
max.poll.records = 10000 
fetch.min.bytes = 100000
enable.auto.commit = false

1


4
如果我理解正确它的配置(请纠正我),那么基于以下相同的内容,只要不超过每个分区1MB(max.partition.fetch.bytes)和每次最多50MB(fetch.max.bytes),我可以在每100ms(poll.ms)内获取1000条记录(max.poll.records)。但这是不正确的。:) max.poll.records指定poll()函数可以返回多少记录-如果单个对代理的“获取”操作返回更多记录,则下一个“poll()”调用将从消费者的内部缓冲区提供服务(即,没有网络请求)。max.poll.records基本上是一个旋钮来调节您的应用程序代码,即,在调用poll()之前要处理多少条记录。频繁地调用poll()使您的应用程序更具响应性(例如,当调用poll()时才会发生重新平衡-还需要频繁调用poll()以避免违反max.poll.interval.ms)。poll.ms是在没有数据可用的情况下poll()中的最大阻塞时间。这可以避免忙等待。但是,如果有数据,poll()将立即返回。因此,实际的“网络吞吐量”仅基于“获取请求”设置。

1
嗨,马蒂亚斯,再次感谢您的建议!现在更有意义了。 - Guilherme Alcântara

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接