AWS Kinesis ShardIteratorType TRIM_HORIZON 的预期行为

14
背景: 我并不是指基于KCL的应用程序,而是纯粹的Kinesis API调用。

使用TRIM_HORIZON分片迭代器类型是否会立即为您提供流中最早发布的记录(即在Kinesis内置的24小时窗口内最早可用的记录),还是仅为某个时间段(最多24小时)提供迭代器/光标,您必须使用它来沿着流前进,直到达到最早发布的记录?

换句话说,如果不太清楚....

当使用TRIM_HORIZON分片迭代器类型时,预期行为是否将从返回24小时前可用的记录开始,但如果恰好24小时前没有发布任何记录,而是3小时前,那么您的应用程序将需要通过先前的21小时进行迭代轮询,才能到达发布的3小时前的记录?

时间线示例:

  1. 9月29日上午5:00 - 创建一个名为“foo”的流,其中包含1个分片
  2. 9月29日上午5:02 - 将单个记录“Item=A”发布到“foo”流中
  3. 9月29日上午5:03 - 使用TRIM_HORIZON作为您的分片迭代器类型发出GetShardIterator调用,然后使用该分片迭代器发出GetRecords调用并接收记录“Item=A”
  4. 9月30日上午7:02 - 将第二个记录“Item=B”发布到“foo”流中
  5. 9月30日上午7:03 - 使用TRIM_HORIZON作为您的分片迭代器类型发出GetShardIterator调用,然后使用该分片迭代器发出GetRecords调用。此调用应返回什么结果?(注意:我们没有记住/重复使用步骤3中的分片迭代器)
对于上述的第5步骤,自从“Item=A”消息发布到流中已经超过24小时,而“Item=B”刚发布了一分钟。使用TRIM_HORIZON获取新的分片迭代器是否会立即给出最早可用的记录,还是需要不断迭代直到达到某个时期有内容被发布?
我一直在尝试使用Kinesis,昨天或前天一切正常(即我可以无问题地发布和消费)。我对我的代码进行了一些额外的修改,并开始重新发布。当我启动我的消费者后,即使让它运行几分钟,也没有任何输出。我尝试同时发布和消费,但仍然没有任何输出。手动调整AFTER_SEQUENCE_NUMBER迭代器类型,并使用几天前消费者日志中的一些序列号,我才能够看到我最近发布的消息。但是,如果我回到使用TRIM_HORIZON类型,我就完全看不到任何消息。
我看了文档,但大多数文档都假定你正在使用KCL(实际上我最初也在使用KCL,但当它开始失败时,我改用原始API调用),并提到你必须有一个应用程序名称,并且DynamoDB表用于跟踪状态。据我所知,如果你使用纯Kinesis API调用或Kinesis CLI,这些都不是真的。我最终编写了一个纯API脚本,以TRIM_HORIZON开始轮询,无限期地轮询,最终它会碰到新记录(需要约600次迭代;起初落后于“现在”14小时,并在“现在”之前约5小时找到记录)。如果这是预期行为,那么文档中的措辞似乎有点令人困惑/误导:

TRIM_HORIZON-从系统中的最后一个未修剪的记录开始读取分片,即分片中最旧的数据记录。

我曾认为(现在看来是错误的)“最旧的数据记录”这个术语是指我发布到流中的记录,而不仅仅是流中的时间段。如果有人能帮忙确认/解释我所看到的行为,那就太好了。谢谢!
2个回答

1

TRIM_HORIZON会返回流中最旧的记录。

但有时在将shard_iterator_type设置为TRIM_HORIZON时:

 Suppose the value of "millis_behind_latest" in the kinesis response is ~86399000 & your stream retention period is 24 hours(86400000) 

当您使用shard_iterator检索记录时,由于记录的保留期已超过,因此记录已不再流中。因此,您会得到一个空结果,因为最旧的记录已过期并且不再存在于数据流中。因此,shard_iterator现在指向磁盘上的一个空间。
当发生这种情况时,请获取“next_shard_iterator”的值,并再次使用get_records获取Kinesis数据记录。
另外,我们并不完全知道AWS如何管理数据流中的每个分片。数据如何被擦除和添加进去。也许数据没有存储在并发/连续的内存块中,因此我们在检索数据之间会得到空结果。
一直获取“next_shard_iterator”的值并使用get_records,直到“millis_behind_latest”为0。
希望这个答案能有所帮助。 :)

1

它在TRIM HORIZON,或者流TRIMming发生的地方。

当调用时,分片迭代器可能会得到0条记录,因此您需要继续迭代以达到最旧记录所在的区域(如果您不经常推送到流中或存在时间间隔)。getRecords将为您提供下一个可用于迭代的分片迭代器。

来自文档: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

如果迭代器指向的分片部分中没有可用的记录,则GetRecords返回一个空列表。请注意,可能需要多次调用才能到达包含记录的分片部分。


那么,即使有新记录可用,GetRecords返回0条记录是正常/预期的行为吗?是什么决定了切断发生的位置?Kinesis有一个24小时的窗口,但分片迭代器并不总是落后24小时。在我的测试中,它落后了14小时,但那里没有任何记录。那个14小时的意义是什么? - jumand
免责声明:我不知道Kinesis内部是如何工作的。我的猜测基于文档和观察行为。回答:是的,你看到的行为就是我也看到的。有时按照分片迭代器获取0条记录是正常的。我猜想Kinesis在内部保留了一个shardIterator ID到记录序列号的映射表,并且随着trim horizon的推进而回收这些序列号。我还猜想这取决于回收发生的时间以及它是否以惰性方式完成。 - Mircea
我猜当你查找序列号相关的内容时,它也会使用分片迭代器id->序列号映射来快速查找数据,然后通过分片id指向的记录进行迭代,找到你需要的序列号。 - Mircea
感谢确认看到类似的行为。在使用“TRIM_HORIZON”类型时,您是否注意到了MillisBehindLatest值的延迟有任何趋势/一致性?我很想知道背后的具体情况。 - jumand

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接