我目前正在使用Kafka Connect S3 Sink Connector 3.3.1将 Kafka 消息复制到 S3,并且在处理迟到的数据时遇到了 OutOfMemory 错误。
我知道这看起来是一个很长的问题,但我尽力使其清晰简单易懂。 非常感谢您的帮助。
高级信息
- 连接器对Kafka消息进行简单的字节到字节的复制,并在字节数组的开头添加消息长度(用于解压缩)。
- 这是
CustomByteArrayFormat
类的职责(请参见下面的配置)
- 这是
- 根据
Record
时间戳将数据分区和分桶CustomTimeBasedPartitioner
扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner
,其唯一目的是覆盖generatePartitionedPath
方法以将主题放在路径末尾。
- Kafka Connect进程的总堆大小为24GB(仅一个节点)
- 连接器每秒处理8,000至10,000条消息
- 每条消息的大小接近1 KB
- Kafka主题有32个分区
OutOfMemory错误的上下文
- 只有当连接器关闭了几个小时并且需要赶上数据时,才会发生这些错误
- 当将连接器打开时,它开始赶上,但很快就会发生OutOfMemory错误
可能但不完整的解释
- 当发生OOM错误时,连接器的
timestamp.extractor
配置设置为Record
- 将此配置切换到
Wallclock
(即Kafka Connect进程的时间)不会引发OOM错误,并且所有延迟数据都可以处理,但是延迟数据不再正确分桶- 所有延迟数据将被桶入在连接器重新打开的时间的
YYYY/MM/dd/HH/mm/topic-name
中
- 所有延迟数据将被桶入在连接器重新打开的时间的
- 因此,我猜想,当连接器尝试根据
Record
时间戳正确地对数据进行分桶时,它会进行太多的并行读取,从而导致OOM错误"partition.duration.ms": "600000"
参数使连接器在每小时内以六个10分钟的路径桶装数据(2018/06/20/12/[00|10|20|30|40|50]
表示2018-06-20下午12点)- 因此,对于24小时的延迟数据,连接器必须在
24*6 = 144
个不同的S3路径中输出数据。 - 每个10分钟文件夹包含10,000条消息/秒* 600秒=6,000,000条消息,大小为6 GB
- 如果确实进行并行读取,那么将有864GB的数据进入内存
- 我认为我必须正确配置一组给定的参数才能避免这些OOM错误,但我觉得我没有看到大局
"flush.size": "100000"
意味着如果读取的消息超过100,000条,则应将它们提交到文件中(从而释放内存)- 对于1KB的消息,这意味着每100MB提交一次
- 但是,即使有144个并行读取,仍然只会得到总共14.4GB,这比可用的24GB堆大小要小
"flush.size"
是每个分区读取的记录数之前提交我的主要问题是,考虑到以下因素,哪些数学知识可以帮助我计划内存使用:- 每秒记录的数量
- 记录的大小
- 从中读取的Kafka主题的分区数
- 连接器任务的数量(如果相关)
- 每小时写入的桶的数量(这里是6,因为配置了“partition.duration.ms”:“600000”)
- 处理迟到数据的最大时间(以小时为单位)
配置
S3接收器连接器配置
{ "name": "xxxxxxx", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.region": "us-east-1", "partition.duration.ms": "600000", "topics.dir": "xxxxx", "flush.size": "100000", "schema.compatibility": "NONE", "topics": "xxxxxx,xxxxxx", "tasks.max": "16", "s3.part.size": "52428800", "timezone": "UTC", "locale": "en", "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat", "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "name": "xxxxxxxxx", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "xxxxxxx", "rotate.schedule.interval.ms": "600000", "path.format": "YYYY/MM/dd/HH/mm", "timestamp.extractor": "Record" }
工作者配置
bootstrap.servers=XXXXXX key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false consumer.auto.offset.reset=earliest consumer.max.partition.fetch.bytes=2097152 consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor group.id=xxxxxxx config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-status rest.advertised.host.name=XXXX
编辑:
我忘记添加我遇到的错误示例:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482) java.lang.OutOfMemoryError: Java heap space [2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483) [2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)