Kinesis消费者返回空记录(boto,Python)

4

我在检查写入Kinesis的数据时遇到了问题。看起来以下示例应该可以工作,但我从get_records(在Records字段中)返回了一个空列表。有什么想法可能出了什么问题吗?

import uuid
import boto3
import time


streamname = 'mytestStream'
session = boto3.session.Session() 
kinesis_client = session.client('kinesis', region_name='us-east-1')


##### WRITE TO KINESIS

partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)

time.sleep(5)


##### READ FROM KINESIS

shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)

谢谢!

1个回答

4
如果您使用的是最新检查点,应先阅读流,然后放置记录。在您的示例中,时间线如下:
- 在t0时刻:流中的最新检查点为101。 - 在t1时刻(主线程):您将记录放入流中,并且该记录位于检查点102。 - 在t2时刻(主线程):您从LATEST点开始跟踪流,该点为103。
要解决此问题,您应该在不同的线程中运行生产者和消费者。正确的流程应如下:
- 在t0时刻(消费者线程):从LATEST位置开始跟踪流,该位置为201。 - 在t1时刻(生产者线程):您将记录放入流中,并且该记录被放置在检查点202上。 - 在t2时刻(消费者线程):由于服务器端的分片向前移动了(因为您刚刚添加了数据),并且您自从检查点201以来一直在跟踪该分片,因此您迭代新的检查点202并显示您的数据。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接