如何处理Amazon Kinesis记录的重复数据?

24
根据Amazon Kinesis Streams文档,一条记录可能会传递多次。
确保只处理每个记录一次的唯一方法是将它们暂时存储在支持完整性检查的数据库中(例如DynamoDB,Elasticache或MySQL / PostgreSQL),或仅为每个Kinesis分片检查点RecordId。
您知道更好/更有效处理重复项的方式吗?
2个回答

27
在构建移动应用的遥测系统时,我们遇到了这个问题。在我们的情况下,我们也不确定生产者是否确保每条消息仅发送一次,因此对于每个收到的记录,我们会即时计算其MD5,并检查它是否出现在某种形式的持久性存储中,但实际上要使用哪种存储是最棘手的部分。
首先,我们尝试使用传统关系数据库,但它很快成为整个系统的主要瓶颈,因为这不仅是一个读重型的案例,而且也是一个写重型的案例,因为通过 Kinesis 传输的数据量相当大。
最终,我们使用DynamoDB表来存储每个唯一消息的MD5。我们遇到的问题是删除消息并不容易——尽管我们的表包含分区和排序键,但 DynamoDB 不允许删除具有给定分区键的所有记录,我们必须查询所有记录以获取排序键值(这浪费时间和容量)。不幸的是,我们只能偶尔简单地删除整个表格。另一种次优解决方案是定期轮换存储消息标识符的 DynamoDB 表格。
然而,最近 DynamoDB 推出了一个非常方便的功能-过期时间,这意味着现在我们可以通过启用每条记录的自动过期来控制表格的大小。从这个角度来看,DynamoDB 在某种程度上类似于ElastiCache,但是ElastiCache(至少Memcached集群)的耐久性要低得多——那里没有冗余,所有驻留在终止节点上的数据都会在缩放操作或故障的情况下丢失。

2
嗨,Dmitry。我正在运行几个基准测试,使用类似于此处解释的JustGiving基础架构:https://aws.amazon.com/blogs/compute/serverless-cross-account-stream-replication-using-aws-lambda-amazon-dynamodb-and-amazon-kinesis-firehose/。为什么你要计算MD5校验和,而不是在DDB表中使用Shardid + SequenceNumber? - Antonio
4
嗨@Antonio。在我们的情况下,生产者有可能会发布相同的消息多次。如果是这种情况,那么Kinesis无论如何都会将它们视为不同的消息(因为生产者发布了2个或更多帖子)。由于我们知道每条消息都必须是唯一的,所以我们简单地忽略了md5已经被看到的消息。此外,生产者计算了md5,为消费者节省了一些计算时间(鉴于通过Kinesis传输的数据量相对较大)。 - Dmitry Deryabin
1
只是想提出来 - AWS指出,由于错误情况,不同的生产者自然会多次生成相同的记录,而更常见的是,多个消费者可能会拉取相同的记录集。我现在也在处理我们系统中的这个问题。我们使用elasticsearch,并且目前的计划是使用elastics内置的版本控制来确保不会同时更新同一条记录,然后在记录本身上备忘最近应用于记录的事件列表。 - genexp
TTL 真的是一个改变游戏规则的东西。我不想再为删除记录支付任何额外的费用。Dynamodb 会处理删除操作,需要最多 48 小时来删除记录。 - Ankur Kothari

16
你提到的问题是所有采用“至少一次”处理方式的队列系统普遍存在的问题。此外,不仅是队列系统,生产者和消费者也可能多次处理同一条消息(由于ReadTimeout错误等原因)。Kinesis和Kafka都使用了这种范例。不幸的是,对此并没有简单的答案。
你可以尝试使用“精确一次”的消息队列,采用更严格的事务处理方式。例如,AWS SQS就是这样做的:https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower-prices-for-standard-queues/。请注意,SQS的吞吐量比Kinesis要小得多。
为了解决你的问题,你应该了解你的应用领域,并尝试像你建议的那样在内部解决它(例如数据库检查)。特别是当你与外部服务通信时(比如说一个邮件服务器),你应该能够恢复操作状态,以防止重复处理(因为在邮件服务器示例中,重复发送可能会导致接收者邮箱中有多个相同的帖子副本)。
另请参见以下概念:
1. 至少一次交付:http://www.cloudcomputingpatterns.org/at_least_once_delivery/ 2. 精确一次交付:http://www.cloudcomputingpatterns.org/exactly_once_delivery/ 3. 幂等处理器:http://www.cloudcomputingpatterns.org/idempotent_processor/

谢谢您的回答。由于吞吐量很高,我无法使用SQS。高吞吐量也是我正在使用不同持久性存储(Mysql / PgSQL / Aurora / ElasticSearch / DynamoDB)对几种解决方案进行基准测试的原因。 暂时存储事件ID的最佳方法是Redis,但ElastiCache无法保证数据持久性。这就是为什么我在寻找其他方法的原因。 - Antonio
1
Redis可以为您提供严格的事务跟踪,但它是单节点的,而RDS太慢了,您是正确的。 DynamoDB似乎是您唯一的PaaS解决方案。但是,如果您想管理EC2实例,可以尝试内存集群解决方案,例如Hazelcast或VoltDB(在许多r3节点上)? - az3
1
内存数据库不具备持久性。如果您的Hazelcast集群失败,您将无法了解哪些消息已经被处理。:( - Antonio

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接