同一个 Kinesis 数据流的多个不同消费者

5
我有一个Kinesis生产者,它向流中写入单一类型的消息。我想在多个完全不同的消费者应用程序中处理此流。因此,对于给定的主题/流,使用单个发布者的发布/订阅模式。我还希望利用检查点来确保每个消费者处理写入流的每条消息。
最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,就会出现以下错误:
com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)
这似乎是因为消费者在使用相同的应用程序名称时与其检查点发生冲突。
从文档中阅读,似乎唯一实现带有检查点的发布/订阅模式的方法是针对每个消费者应用程序拥有一个流,这要求每个生产者都知道所有可能的消费者。这比我想要的更紧密耦合;实际上只是一个队列。
看来Kafka支持我想要的:任意消费给定主题/分区,因为消费者完全控制自己的检查点。如果我想要带有检查点的发布/订阅模式,我的唯一选择是转向Kafka或其他替代方案吗?
我的RecordProcessor代码在每个消费者中完全相同:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  log.trace("Received record(s) from kinesis")
  for {
    record <- processRecordsInput.getRecords
    json   <- jawn.parseByteBuffer(record.getData).toOption
    msg    <- decode[T](json.toString).toOption
  } yield subscriber ! msg
  processRecordsInput.getCheckpointer.checkpoint()
}

代码解析消息并将其发送给订阅者。目前,我只是将所有消息标记为成功接收。我可以在AWS Kinesis仪表板上看到消息被发送,但没有读取发生,可能是因为每个应用程序都有自己的AppName,并且不会看到其他消息。

1
我很难理解你是如何得到这个错误的。你能展示一下每个消费者获取 shard 迭代器并读取记录的代码片段吗?基本上,Kinesis 就是为了这个目的而构建的。我有 4 个 Lambda 函数,每个函数以不同的方式处理记录,但都消费了完全相同的记录。 - johni
@johni,我已经添加了我用来解析记录的代码。 - CalumMcCall
1个回答

14

您想要的这种模式,即一个Kinesis流向多个消费者的发布者支持。 您不需要为每个消费者单独创建流。

如何做到这一点? 您需要为每个消费者提供不同的应用程序名称。 这样,一个消费者的检查点信息就不会与另一个消费者的相冲突。

请查看此问题的第一个回答:https://forums.aws.amazon.com/message.jspa?messageID=554375


好的,那么我在实现的其他地方肯定做错了什么。感谢您帮助澄清我正在正确的轨道上。 - CalumMcCall
3
同一个应用程序中如何处理多个消费者?您可以拥有同一服务/应用程序的多个实例池从同一个流中读取吗? - MadHacker
1
链接已损坏。 - Daenyth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接