Kafka Connect S3连接器使用TimeBasedPartitioner时出现内存溢出错误

23

我目前正在使用Kafka Connect S3 Sink Connector 3.3.1将 Kafka 消息复制到 S3,并且在处理迟到的数据时遇到了 OutOfMemory 错误。

我知道这看起来是一个很长的问题,但我尽力使其清晰简单易懂。 非常感谢您的帮助。

高级信息

  • 连接器对Kafka消息进行简单的字节到字节的复制,并在字节数组的开头添加消息长度(用于解压缩)。
    • 这是CustomByteArrayFormat类的职责(请参见下面的配置)
  • 根据Record时间戳将数据分区和分桶
    • CustomTimeBasedPartitioner扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner,其唯一目的是覆盖generatePartitionedPath方法以将主题放在路径末尾。
  • Kafka Connect进程的总堆大小为24GB(仅一个节点)
  • 连接器每秒处理8,000至10,000条消息
  • 每条消息的大小接近1 KB
  • Kafka主题有32个分区

OutOfMemory错误的上下文

  • 只有当连接器关闭了几个小时并且需要赶上数据时,才会发生这些错误
  • 当将连接器打开时,它开始赶上,但很快就会发生OutOfMemory错误

可能但不完整的解释

  • 当发生OOM错误时,连接器的timestamp.extractor配置设置为Record
  • 将此配置切换到Wallclock(即Kafka Connect进程的时间)不会引发OOM错误,并且所有延迟数据都可以处理,但是延迟数据不再正确分桶
    • 所有延迟数据将被桶入在连接器重新打开的时间的YYYY/MM/dd/HH/mm/topic-name
  • 因此,我猜想,当连接器尝试根据Record时间戳正确地对数据进行分桶时,它会进行太多的并行读取,从而导致OOM错误
    • "partition.duration.ms": "600000"参数使连接器在每小时内以六个10分钟的路径桶装数据(2018/06/20/12/[00|10|20|30|40|50]表示2018-06-20下午12点)
    • 因此,对于24小时的延迟数据,连接器必须在24*6 = 144个不同的S3路径中输出数据。
    • 每个10分钟文件夹包含10,000条消息/秒* 600秒=6,000,000条消息,大小为6 GB
    • 如果确实进行并行读取,那么将有864GB的数据进入内存
  • 我认为我必须正确配置一组给定的参数才能避免这些OOM错误,但我觉得我没有看到大局
    • "flush.size": "100000"意味着如果读取的消息超过100,000条,则应将它们提交到文件中(从而释放内存)
      • 对于1KB的消息,这意味着每100MB提交一次
      • 但是,即使有144个并行读取,仍然只会得到总共14.4GB,这比可用的24GB堆大小要小
      • "flush.size"是每个分区读取的记录数之前提交我的主要问题是,考虑到以下因素,哪些数学知识可以帮助我计划内存使用:
        • 每秒记录的数量
        • 记录的大小
        • 从中读取的Kafka主题的分区数
        • 连接器任务的数量(如果相关)
        • 每小时写入的桶的数量(这里是6,因为配置了“partition.duration.ms”:“600000”)
        • 处理迟到数据的最大时间(以小时为单位)

        配置

        S3接收器连接器配置

        {
          "name": "xxxxxxx",
          "config": {
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "s3.region": "us-east-1",
            "partition.duration.ms": "600000",
            "topics.dir": "xxxxx",
            "flush.size": "100000",
            "schema.compatibility": "NONE",
            "topics": "xxxxxx,xxxxxx",
            "tasks.max": "16",
            "s3.part.size": "52428800",
            "timezone": "UTC",
            "locale": "en",
            "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
            "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
            "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
            "name": "xxxxxxxxx",
            "storage.class": "io.confluent.connect.s3.storage.S3Storage",
            "s3.bucket.name": "xxxxxxx",
            "rotate.schedule.interval.ms": "600000",
            "path.format": "YYYY/MM/dd/HH/mm",
            "timestamp.extractor": "Record"
        }
        

        工作者配置

        bootstrap.servers=XXXXXX
        key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
        internal.key.converter=org.apache.kafka.connect.json.JsonConverter
        internal.value.converter=org.apache.kafka.connect.json.JsonConverter
        internal.key.converter.schemas.enable=false
        internal.value.converter.schemas.enable=false
        consumer.auto.offset.reset=earliest
        consumer.max.partition.fetch.bytes=2097152
        consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
        group.id=xxxxxxx
        config.storage.topic=connect-configs
        offset.storage.topic=connect-offsets
        status.storage.topic=connect-status
        rest.advertised.host.name=XXXX
        

        编辑:

        我忘记添加我遇到的错误示例:

        2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
        java.lang.OutOfMemoryError: Java heap space
        [2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
        [2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
        org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
        
2个回答

38

我终于能够理解 Kafka Connect S3 Connector 中的堆大小使用方式了

  • S3 连接器将每个 Kafka 分区的数据写入分区的路径
    • 这些路径的划分方式取决于 partitioner.class 参数;
    • 默认情况下,是按时间戳划分的,而partition.duration.ms 的值将确定每个分区的持续时间。
  • S3 连接器将为每个 Kafka 分区(读取的所有主题)和每个分区的路径分配一个缓冲区,大小为s3.part.size 字节
    • 例如:读取20个分区,将 timestamp.extractor 设置为 Recordpartition.duration.ms设置为1小时,s3.part.size设置为50 MB
      • 每小时所需的堆大小为 20 * 50 MB = 1 GB;
      • 但是,由于timestamp.extractor设置为Record,具有对应于比读取时早的小时的时间戳的消息将会被缓冲在此更早的小时缓冲区中。因此,实际上,连接器将需要至少 20 * 50 MB * 2h = 2 GB 的内存,因为始终存在延迟事件,如果有延迟超过1小时的事件,则需要更多的内存;
    • 需要注意的是,如果将 timestamp.extractor 设置为Wallclock,那么就不适用于 Kafka Connect 的晚到事件了。
  • 这些缓冲区在以下 3 种情况下会被刷新(即离开内存):
    • rotate.schedule.interval.ms 时间已过
      • 此刷新条件总是触发。
    • 基于 timestamp.extractor 时间,已经过了 rotate.interval.ms 时间
      • 这意味着,如果将 timestamp.extractor 设置为Record,则可以在少于或多于10分钟的时间内通过 Record 时间传递10分钟的数据。
        • 例如,在处理迟到的数据时,10分钟的数据将在几秒钟内处理完毕,如果 rotate.interval.ms 设置为10分钟,则该条件将每秒触发一次(应该如此);
        • 相反地,如果事件流中有一个暂停,除非看到带有时间戳的事件,表明已经过去了超过rotate.interval.ms自上次触发条件以来的时间,否则该条件将不会触发。
    • 在少于min(rotate.schedule.interval.ms,rotate.interval.ms)的时间内已读取 flush.size 条消息
      • 关于rotate.interval.ms,如果没有足够的消息,此条件可能永远不会触发。
  • 因此,您需要至少计划Kafka partitions * s3.part.size堆大小。
    • 如果您使用Record时间戳进行分区,则应将其乘以max lateness in milliseconds / partition.duration.ms
      • 这是最坏的情况,其中所有分区都存在常规延迟事件,并且涉及到max lateness in milliseconds的所有范围。
  • 当S3连接器从Kafka读取时,还将每个分区缓冲consumer.max.partition.fetch.bytes字节。
    • 默认情况下,这设置为2.1 MB。
  • 最后,您不应认为所有堆大小都可用于缓冲Kafka消息,因为其中还有许多不同的对象。
    • 一个安全的考虑是确保Kafka消息的缓冲不超过总可用堆大小的50%。

  • 1
    非常感谢您提供详细的答案。如果有更多像您这样的人,互联网将会变得更加美好。 - Guido
    2
    谢谢Guido,非常感谢。我很高兴它有用。 - raphael
    @raphael 如果你的连接器正在汇聚多个主题,您是否也需要将计算乘以主题的数量? - moku
    1
    @moku 不是按主题数量乘以,而是按包括所有主题的总分区数乘以。 例如你有一个有10个分区的主题和另一个有24个分区的主题,那么你需要乘以34。 - raphael

    1
    @raphael已经完美地解释了工作原理。下面是我遇到过的一个类似问题的小变化(事件太少,但跨越多个小时/天)。
    在我的情况下,我有大约150个连接器,其中8个由于需要处理约7天的数据(我们的测试环境中的kafka停机约2周)而失败。
    所采取的步骤:
    1. 将所有连接器的s3.part.size从25MB减少到5MB。(在我们的场景中,rotate.interval设置为10分钟,flush.size设置为10000。大多数事件应该很容易适合这个限制)。
    2. 在进行此设置后,只有一个连接器仍然出现OOM,并且此连接器在启动后的5秒内进入OOM状态(基于堆分析),它从200MB-1.5GB的堆利用率迅速上升。查看kafka偏移滞后时,跨越7天只有8K事件需要处理。因此,这不是因为要处理太多事件,而是因为需要处理/刷新的事件太少。
    3. 由于我们使用了每小时分区,并且在一个小时内几乎只有100个事件,所以这7天的所有缓冲区都在没有刷新(没有释放到JVM)的情况下被创建 - 7 * 24 * 5MB * 3 partitions = 2.5GB(xmx-1.5GB)。

    修复:

    执行以下步骤之一,直到您的连接器赶上并恢复旧配置。 (推荐方法-1)

    1. 将连接器配置更新为处理100或1000条记录flush.size(取决于数据结构)。
      缺点: 如果实际事件超过1000,则会在每小时创建太多小文件。
    2. 将分区更改为每日,这样只会有每日分区。
      缺点: 现在您的S3中将有混合的按小时和按天分区。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接