我有一个Kinesis生产者,它向流中写入单一类型的消息。我想在多个完全不同的消费者应用程序中处理此流。因此,对于给定的主题/流,使用单个发布者的发布/订阅模式。我还希望利用检查点来确保每个消费者处理写入流的每条消息。
最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,就会出现以下错误:
com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)
这似乎是因为消费者在使用相同的应用程序名称时与其检查点发生冲突。
从文档中阅读,似乎唯一实现带有检查点的发布/订阅模式的方法是针对每个消费者应用程序拥有一个流,这要求每个生产者都知道所有可能的消费者。这比我想要的更紧密耦合;实际上只是一个队列。
看来Kafka支持我想要的:任意消费给定主题/分区,因为消费者完全控制自己的检查点。如果我想要带有检查点的发布/订阅模式,我的唯一选择是转向Kafka或其他替代方案吗?
我的RecordProcessor代码在每个消费者中完全相同:
代码解析消息并将其发送给订阅者。目前,我只是将所有消息标记为成功接收。我可以在AWS Kinesis仪表板上看到消息被发送,但没有读取发生,可能是因为每个应用程序都有自己的AppName,并且不会看到其他消息。
最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,就会出现以下错误:
com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)
这似乎是因为消费者在使用相同的应用程序名称时与其检查点发生冲突。
从文档中阅读,似乎唯一实现带有检查点的发布/订阅模式的方法是针对每个消费者应用程序拥有一个流,这要求每个生产者都知道所有可能的消费者。这比我想要的更紧密耦合;实际上只是一个队列。
看来Kafka支持我想要的:任意消费给定主题/分区,因为消费者完全控制自己的检查点。如果我想要带有检查点的发布/订阅模式,我的唯一选择是转向Kafka或其他替代方案吗?
我的RecordProcessor代码在每个消费者中完全相同:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
log.trace("Received record(s) from kinesis")
for {
record <- processRecordsInput.getRecords
json <- jawn.parseByteBuffer(record.getData).toOption
msg <- decode[T](json.toString).toOption
} yield subscriber ! msg
processRecordsInput.getCheckpointer.checkpoint()
}
代码解析消息并将其发送给订阅者。目前,我只是将所有消息标记为成功接收。我可以在AWS Kinesis仪表板上看到消息被发送,但没有读取发生,可能是因为每个应用程序都有自己的AppName,并且不会看到其他消息。
Kinesis
就是为了这个目的而构建的。我有 4 个Lambda
函数,每个函数以不同的方式处理记录,但都消费了完全相同的记录。 - johni