Kafka -> Flink - 性能问题

3
我正在查看一些每秒生成约30,000条消息的kafka主题。我已经设置了一个flink拓扑结构来读取其中一个主题,进行一些聚合(5秒窗口),然后(最终)写入数据库。
当我运行我的拓扑结构并删除除读取->聚合步骤之外的所有内容时,我只能获得每分钟约30,000条消息。没有地方会出现背压(backpressure)。
我做错了什么?
编辑:
1. 我不能更改主题空间中的任何内容。每个主题只有一个分区,而且有数百个主题。 2. 每个消息是一个平均为2-3Kb的压缩thrift对象。
看起来我只能获得约1.5 MB/s。与提及的100MB/s相去甚远。
目前的代码路径:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);

public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = timeData.getOtherId();
        tuple4.f2 = timeData.getSections().size();
        tuple4.f3 = mapId;

        collector.collect(tuple4);
    }
}

我有生产者代码并可以从中读取吞吐量。 - ethrbunny
你的生产者每秒钟生产大约30K条消息,但是你的消费者每分钟只能消费大约30K条消息? - Morgan Kenyon
是的,这就是我看到的。我知道一旦数据进入flink,它会非常快地处理。我有另一个topo将>6M条记录/分钟推送到cassandra。我想知道是否存在与kafka消费者FlinkKafkaConsumer081相关的固有瓶颈。请注意,此主题仅具有单个分区。这可能是个问题吗? - ethrbunny
我使用FlinkKafkaConsumer从Kafka读取了大约100MB/s的数据。Kafka消息的大小是多少?消息如何序列化?您如何从Kafka消息中反序列化数据?为了得出我上面提到的数字,我需要对我的序列化程序进行大量优化。 - Robert Metzger
Kafka 分区的数量基本上限制了可以从 Kafka 中读取数据的最大并行度。因此,我建议您将分区数设置为至少与 Flink 作业中的并行度相同的数量。 - Till Rohrmann
显示剩余2条评论
2个回答

5

从代码中我看到两个可能会导致性能问题的组件:

  • FlinkKafkaConsumer
  • Thrift反序列化器

为了了解瓶颈在哪里,我首先会测量Flink从Kafka主题读取原始数据的读取性能。

因此,请您在集群上运行以下代码:

public class RawKafka {

private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);

    dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
        long received = 0;
        long logfreq = 50000;
        long lastLog = -1;
        long lastElements = 0;

        @Override
        public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
            received++;
            if (received % logfreq == 0) {
                // throughput over entire time
                long now = System.currentTimeMillis();

                // throughput for the last "logfreq" elements
                if(lastLog == -1) {
                    // init (the first)
                    lastLog = now;
                    lastElements = received;
                } else {
                    long timeDiff = now - lastLog;
                    long elementDiff = received - lastElements;
                    double ex = (1000/(double)timeDiff);
                    LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
                            timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
                    // reinit
                    lastLog = now;
                    lastElements = received;
                }
            }
        }
    });

    env.execute("Raw kafka throughput");
}
}

这段代码用于测量从 Kafka 中读取 50,000 个元素所需的时间,并记录从 Kafka 中读取的元素数量。在我的本地计算机上,吞吐量约为每核心每秒 ~330,000 个元素:

16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0

我很感兴趣看到您从Kafka读取时的吞吐量。


我必须为这个问题道歉。根据你的工具输出,我重新审查了我们在生产者端测量“流量”的方式。我们测量的内容存在一些语义问题,以及其报告的值。长话短说:我能够每秒读取大约5K条消息,并且我怀疑它还会更高。 - ethrbunny
没问题。如果你需要更多的建议来提高性能,请告诉我。 - Robert Metzger
@rmetzger 感谢你提供的工具!使用它,我发现在没有使用特殊的序列化程序(而是使用了“SimpleStringSchema”)的情况下,我的消费速率达到了70 MB/s。这相当于每秒 ~35K 条消息。我想使用 Avro/Byte[] 来优化我的消息处理速度,请问你能指导我从何处开始吗? - shriyog
你想要优化什么? - Robert Metzger
@rmetzger 我目前的吞吐量约为每秒35K条消息。有没有办法提高它,就像你那样每个核心每秒处理约352941个元素一样? - shriyog
运行Kafka代理的服务器的磁盘速度是多少? 我假设您每个服务器只有一个代理,对吗?一个代理可能只能提供原始磁盘速度的80%。 您的每条消息的大小是多少?在我的示例中,我认为只有几个字节。这就是为什么吞吐量如此之高的原因。 - Robert Metzger

1
我从未使用过Flink或其KafkaConsumer,但我在Storm环境中有Kafka的经验。以下是我的一些想法。 Kafka速度如何确定有很多变量。以下是一些需要考虑和调查的事项,请在有更多细节时添加到您的问题中。
- 添加更多分区应该会增加吞吐量,因此添加更多分区和消费者应该会看到性能的某种线性跳跃。 - Kafka吞吐量与消息大小有关。因此,如果您有大型消息,则吞吐量将相应下降。 - 您有任何证据支持您的期望,即Kafka Consumer应该更快吗?虽然我同意30K msg / min真的很慢,但您有证据支持您的期望吗?例如使用FlinkKafkaConsumer进行一般速度测试(类似于this),或者使用普通Kafka消费者来查看消费速度,然后将其与Flink的消费者进行比较?

可能有很多原因导致消费速度缓慢,我试图强调一些一般与Kafka相关的问题。我相信在Flink中可能有一些你不知道可以加快消费速度的方法,因为我从未使用过它。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接