使用 AWS CLI 如何从 Kinesis 流中读取数据?

3

我在AWS上拥有一个Kinesis流,可以使用kinesis命令将数据(JSON)发送到它,并可以从流中获取数据:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name mystream --query 'ShardIterator' --profile myprofile)
aws kinesis get-records --shard-iterator $SHARD_ITERATOR --profile myprofile

Output of this looks like something like:

HsKCQkidmlkZW9Tb3VyY2UiOiBbCgkJCXsKCQkJCSJicmFuZGluZyI6IHt9LAoJCQkJInByb21vUG9vbCI6IFtdLAoJCQkJImlkIjogbnVsbAoJCQl9CgkJXSwKCQkiaW1hZ2VTb3VyY2UiOiB7fSwKCQkibWV0YWRhdGFBcHByb3ZlZCI6IHRydWUsCgkJImR1ZURhdGUiOiAxNTgzMzEyNTA0ODAzLAoJCSJwcm9maWxlIjogewoJCQkiY29tcG9uZW50Q291bnQiOiAwLAoJCQkibmFtZSI6ICJTUUVfQVRfUFJPRklMRSIsCgkJCSJpZCI6ICJTUUVfQVRfUFJPRklMRV9JRCIsCgkJCSJwYWNrYWdlQ291bnQiOiAwLAoJCQkicGFja2FnZXMiOiBbCgkJCQl7CgkJCQkJIm5hbWUiOiAiUEVBQ09DSy1MVEEiLAoJCQkJCSJpZCI6ICJmZDk5NTRmZC03NDYwLTRjZjItOTU5Ni05YzBhMjcxNTViODgiCgkJCQl9CgkJCV0KCQl9LAoJCSJ3b3JrT3JkZXJJZCI6ICJTUUVfQVRfSk9CX1NVQk1JU1

我应该如何以原始格式(看起来像JSON)获取实际的JSON消息 - 就像我发送它时一样?

谢谢


请查看我的答案。如果这是您正在寻找的内容,请留下一个赞并接受它作为解决方案。 - Rajan Prasad
2个回答

5
根据文档,您需要使用Base64解码工具或使用KCL库以接收原始格式的数据:
在本教程的这个部分,您首先可能会注意到关于记录的一件事情是,数据看起来像垃圾 - 它不是我们发送的明文测试数据。这是因为put-record使用Base64编码使您能够发送二进制数据。然而,AWS CLI中的Kinesis Data Streams支持不提供Base64解码,因为将Base64解码转换为打印到stdout上的原始二进制内容可能会导致某些平台和终端上的不良行为和潜在安全问题。如果您使用Base64解码器(例如https://www.base64decode.org/)手动解码dGVzdGRhdGE=,您将看到它实际上是testdata。这对于本教程已足够,因为在实践中,AWS CLI很少用于消耗数据,而更多地用于监控流的状态并获取信息,如先前所示(describe-stream和list-streams)。未来的教程将向您展示如何使用Kinesis Client Library(KCL)构建生产质量的消费者应用程序,其中Base64已经被处理好了。有关KCL的更多信息,请参阅开发KCL 1.x消费者。

1
在AWS CLI(shell)中应使用哪些命令来将数据转码为常规的JSON格式? - Joe
@Joe,据我所知,AWS cli本身并没有提供任何用于转码为JSON的命令或参数。正如在这里粘贴的AWS文档中也提到的那样。 - Rajan Prasad
@Joe,你想以编程方式解决它还是为了手动查看的目的? - Rajan Prasad

3
在Unix系统中,您可以使用base64 --decode命令来解码Base64编码的Kinesis记录数据。
例如,要解码第一条记录的数据:
# define the name of the stream you want to read
KINESIS_STREAM_NAME='__your_stream_name_goes_here__';

# define the shard iterator to use
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name $KINESIS_STREAM_NAME --query 'ShardIterator');

# read the records, use `jq` to grab the data of the first record, and base64 decode it 
aws kinesis get-records --shard-iterator $SHARD_ITERATOR | jq -r '.Records[0].Data' | base64 --decode

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接