为什么在关闭Amazon Kinesis流的分片时需要检查点?

5
当将一个分片分成两个子分片时,它的父分片会关闭。期望记录处理器(这里使用了KCL)在这种情况下进行检查点,如下面的KCL源代码所示:
try {
                recordProcessor.shutdown(recordProcessorCheckpointer, reason);
                String lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
                if (reason == ShutdownReason.TERMINATE) {
                    if ((lastCheckpointValue == null)
                            || (!lastCheckpointValue.equals(SentinelCheckpoint.SHARD_END.toString()))) {
                        throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
                                + shardInfo.getShardId());
                    }
                }

问题如下:
  • 这个检查点是否必不可少?

  • 如果记录处理器没有进行检查点并吸收异常会发生什么?

我提出这些问题的原因是因为在我的使用场景中,我想确保从流中读取的每个记录都已经被处理并存储到S3中,现在如果分片被关闭,可能仍有尚未刷新的数据项,因此我想确保它们能够重新发送给子分片的新消费者/工作者?如果我执行检查点,它们将不会被重新发送。
有什么想法吗?
提前感谢。
1个回答

8
物品不会在分片之间移动。重新分片后,新记录将放入新的分片中,但旧记录永远不会从父分片转移,也不会再向(现在关闭的)父分片添加新记录。即使在关闭后,数据仍然在父分片中保持其正常的24小时生命周期。只有在处理完来自父分片的数据后,您的记录处理器才会被关闭。

http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-after-resharding.html

顺便说一句,正如你可能已经知道的那样,SDK API很难,而客户端库也不是很好。尝试使用连接器库,它是一个更好的API,并包括一个S3存档应用程序的示例。

https://github.com/awslabs/amazon-kinesis-connectors


谢谢。这很有用。不过我有一个问题,根据你所说,如果我将1000条记录放入一个分片流中。然后在消耗了500个记录后,我将流拆分为两个。假设我使用KCL(而不是使用原生API读取),那么KCL的行为会是什么样子?它会确保我有一段时间有3个RecordProcessors直到剩下的500个被消耗掉,还是说它只会为子分片设置2个RecordProcessors,并且基本上剩下的500个记录会丢失?提前感谢。 - isaac.hazan
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - engineerC
为了澄清,该库将为子分片生成两个额外的处理器,但在添加更多记录之前,它们将不起作用。所有1000条记录都将由原始的父分片处理器处理,因为那里是这些记录存在的地方。 - engineerC
抱歉我没有表达清楚,实际消耗没有影响,那只是潜在数据丢失的一个例子。我会更明确一些,当分片被拆分时,我清楚地看到我的记录处理器上调用了一个关闭方法。我将此关闭解释为:“不再向您发送记录”。问题是:KCL是否保证在将所有待处理记录发送给处理器后调用关闭方法,还是在拆分后立即调用关闭方法,这种情况下我基本上丢失了未消耗的其余500条记录,因为我的处理器正在关闭? - isaac.hazan
当到达关闭分片数据的末尾(当分片迭代器返回null时),您将被关闭,而不是在分片关闭时。您不会丢失记录。 - engineerC
Thx. Accepting and upvoting. - isaac.hazan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接