Explain Kinesis Shard Iterator - AWS Java SDK

12

好的,我会以一个详细的用例开始,并解释我的问题:

  1. 我使用第三方Web分析平台,该平台利用AWS Kinesis流将数据从客户端传递到最终目的地 - Kinesis流;
  2. Web分析平台使用2个流:
    1. 数据收集器流(单切片流);
    2. 第二个流用于从收集器流中丰富原始数据(单切片流);最重要的是,此流使用TRIM_HORIZON迭代器类型从第一个流中消耗原始数据;
  3. 我使用AWS Java SDK从流中获取数据,具体地使用GetShardIteratorRequest类;
  4. 我目前正在开发提取类,因此这是同步完成的,这意味着我只在编译类时消耗数据;
  5. 该类出人意料地可以正常工作,尽管有一些事情我不理解,特别是关于如何从流中消耗数据以及每个迭代器类型的含义。

我的问题是我检索到的数据不一致,并且没有时间顺序的逻辑。

  • 当我使用AT_SEQUENCE_NUMBER并提供来自带有

    .getSequenceNumberRange().getStartingSequenceNumber();

    …的分片的第一个序列号作为``时,我没有获取到所有记录。类似地,AFTER_SEQUENCE_NUMBER

  • 当我使用LATEST时,我得不到任何结果;
  • 当我使用TRIM_HORIZON时,它应该是有用的,但似乎不能正常工作。它曾经为我提供数据,然后我添加了新的“事件”(记录到最终流)并收到零条记录。神秘。

我的问题是:

  1. 如何安全地从流中消耗数据,而无需担心丢失的记录?
  • 是否有替代ShardIteratorRequest的方法?
  • 如果有,我如何只是“浏览”流并查看其中的内容以进行调试参考?
  • TRIM_HORIZON方法有什么缺陷吗?
  • 提前感谢,我真的很想了解有关从Kinesis流中消耗数据的更多信息。


    我也有类似的问题-虽然对我来说,在每次迭代中都会出现重复记录(使用AT_SEQUENCE_NUMBER和FROM_SEQUENCE_NUMBER),尽管我使用了每个响应的NextShardIterator值。文档在这个问题上有些神秘....我也很想知道“未修剪”的含义(关于TRIM_HORIZON)。 - Erve1879
    记录一下,期间我做了一些不同的事情 - 我将一个现有的Scala消费者应用程序进行了修改,使其能够连续监听流,并将其移植回纯Java以适应我的需求。这是由SnowPlow最初开发的Scala应用程序:https://github.com/snowplow/kinesis-example-scala-consumer - Yuval Herziger
    很遗憾,我不太擅长Java......!我只希望有一些与语言无关的、清晰的指南,可以确保幂等性和100%的记录“覆盖率”,同时允许消费者重新启动、崩溃等。如果我们必须保存并检查所有先前获取的记录的SequenceNumber以确保没有重复,那似乎会抵消Kinesis的目的。虽然我肯定错过了什么....... - Erve1879
    你尝试过使用亚马逊自己的库吗? https://github.com/awslabs/amazon-kinesis-connectors https://github.com/awslabs/amazon-kinesis-client 这些库(特别是连接器)处理所有繁琐的事情,如定位检查点、继续处理分片等。 - az3
    昨晚我实际上已经做了。我只需要时间来调查Kinesis的内部细节,KCL是一个很棒的库。我很快就会在这里回答自己的问题,结果证明这一切都与检查点有关。 - Yuval Herziger
    1
    我在不使用KCL的JSON API时遇到了类似的问题。我想要将最后一条记录作为检查点。LATEST返回一个空数组。TRIM_HORIZON目前给我8条记录。我可以遍历所有记录(可能有数千条)来获取最后一条记录,但这似乎很荒谬。最新的应该如何工作? 无论KCL正在做什么,它都应该使用完全相同的API,说“使用KCL”并不能回答问题,它的检查点应该仅基于此API和存储的结果。 - Buzzware
    2个回答

    7
    我了解上面的困惑,并且我也遇到了同样的问题,但是现在我想我已经弄清楚了。请注意,我直接使用JSON API而没有使用KCL。
    看起来API在开始消费流时给客户端提供了2个基本的迭代器选择:
    A)TRIM_HORIZON:用于阅读过去记录,延迟介于几分钟(甚至几小时)和24小时之间。它不会返回最近放置的记录。在此迭代器看到的最后一条记录上使用AFTER_SEQUENCE_NUMBER将返回一个空数组,即使最近PUT了记录。
    B)LATEST:用于实时阅读未来记录(即它们被PUT后立即)。我被文档中唯一能找到的句子所欺骗:“从片段中最新的记录之后开始阅读,以便您始终阅读片段中最新的数据。”你得到一个空数组,因为自获取迭代器以来没有记录被PUT。如果你获取这种类型的迭代器,然后PUT一条记录,那么该记录将立即可用。
    最后,如果你知道最近放置的记录的序列号,你可以使用AT_SEQUENCE_NUMBER立即获取它,你可以使用AFTER_SEQUENCE_NUMBER获取后续记录,即使它们对TRIM_HORIZON迭代器来说看起来是不存在的。
    上述意味着如果你想阅读所有已知的过去记录和未来记录,你必须使用A和B的组合,并处理其中的记录(最近的过去)。KCL可能会很好地解决这个问题。

    2
    AWS自己都无法创建一个像样的API。对于我的下一个项目,我将转向Google Cloud。它几乎不可能更糟糕了。 - Samantha Atkins

    3
    很多时间已经过去了,也许曾经存在的Kinesis错误现在已经得到解决。
    提供一点可视化:
    oldest-records          <-- time -->             newest-records
    |<-- TRIM_HORIZON             |<-- AT_SEQUENCE_NUMBER(n+15)   |<-- LATEST
    n                             n+15                          n+30
    | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ?
                                    n+15+1                        eos
                                    |<--AFTER_SEQUENCE_NUMBER(n+15)
    

    其中n是相应分片中最旧记录的序列号。

    • TRIM_HORIZONLATEST应该不言自明。
      • 也许EARLIESTTRIM_HORIZON更直观。
      • LATEST可以被认为是
        • AFTER_SEQUENCE_NUMBER对于n+30
        • AT_SEQUENCE_NUMBER对于eos的同义词。
    • 我想,选择AFTER_SEQUENCE_NUMBERAT_SEQUENCE_NUMBER取决于您是否已经处理了该序列号处的记录。

    通过正确使用各自的API(即没有PEBKAC),我期望TRIM_HORIZON返回当前可用的所有内容。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接