AWS Kinesis消费者Python 3.4 Boto

3

我正在尝试使用Python 3.4构建Kinesis消费者脚本,以下是我的代码示例。我希望记录保存到本地文件中,以便稍后推送到S3:

from boto import kinesis
import time
import json

# AWS Connection Credentials
aws_access_key = 'your_key'
aws_access_secret = 'your_secret key'

# Selected Kinesis Stream
stream = 'TwitterTesting'

# Aws Authentication
auth = {"aws_access_key_id": aws_access_key, "aws_secret_access_key": aws_access_secret}
conn = kinesis.connect_to_region('us-east-1',**auth)

# Targeted file to be pushed to S3 bucket
fileName = "KinesisDataTest2.txt"
file = open("C:\\Users\\csanders\\PycharmProjects\\untitled\\KinesisDataTest.txt", "a")

# Describe stream and get shards
tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    response = conn.describe_stream(stream)
    if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
        break
else:
    raise TimeoutError('Stream is still not active, aborting...')

# Get Shard Iterator and get records from stream
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']
    for shard_id in response['StreamDescription']['Shards']:
        shard_id = shard_id['ShardId']
        shard_iterator = conn.get_shard_iterator(stream,
        shard_id, 'TRIM_HORIZON')
        shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})
        tries = 0
        result = []
        while tries < 100:
            tries += 1
            response = conn.get_records(shard_iterator, 100)
            shard_iterator = response['NextShardIterator']
            if len(response['Records'])> 0:
                for res in response['Records']:
                    result.append(res['Data'])
                    print(result, shard_iterator)

由于某些原因,每次运行此脚本时,我都会收到以下错误提示:
Traceback (most recent call last):
  File "C:/Users/csanders/PycharmProjects/untitled/Get_records_Kinesis.py",  line 57, in <module>
    response = json.load(conn.get_records(shard_ids, 100))
  File "C:\Python34\lib\site-packages\boto-2.38.0-py3.4.egg\boto\kinesis\layer1.py", line 327, in get_records
    body=json.dumps(params))
  File "C:\Python34\lib\site-packages\boto-2.38.0- py3.4.egg\boto\kinesis\layer1.py", line 874, in make_request
    body=json_body)
boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request
{'Message': 'Start of list found where not expected', '__type':   'SerializationException'}

我的最终目标是将这些数据发送到S3存储桶中。首先,我只需要获取这些记录并打印出来。使用put_record函数将JSON Twitter数据转储到流中。如果需要,我也可以发布该代码。

将响应行从response = json.load(conn.get_records(shard_ids, 100))更改为response = conn.get_records(shard_iterator, 100)


在将内容存储到Kinesis之前,您是否对其进行了Base64编码? - garnaat
我对这些还是个新手,你能给我解释一下吗?如果有帮助的话,我也可以发布生产者脚本。我已经让它工作并流式传输到Kinesis了。 - Conned121
实际上,我认为问题在于您将一个字典数组作为“shard_iterator”参数传递,但Kinesis期望一个单个字符串值。 - garnaat
Boto在输入时为您执行base64编码,所以这很好。 - garnaat
我尝试将shard_iterator放入函数中,也尝试将其作为str放入,但仍然出现错误。 boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request {'__type': 'SerializationException', 'Message': 'Start of structure or map found where not expected.'} - Conned121
显示剩余2条评论
2个回答

0
如果您进行以下替换,代码将正常工作("while"中的设置根据您想要收集多少记录,您可以使用无限制的 "with == 0" 并删除 "tries += 1")。
    shard_iterator = conn.get_shard_iterator(stream,
    shard_id, 'TRIM_HORIZON')
    shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})

使用以下内容:

    shard_iterator = conn.get_shard_iterator(stream,
    shard_id, "LATEST")["ShardIterator"]

同时,如果要将内容写入文件中,可以使用change("\n")来表示换行:
print(result, shard_iterator)

to:

file.write(str(result) + "\n")

希望能有所帮助。


0

输入一个shard_ID,出现了以下错误信息。我也尝试移除json.load: Traceback (most recent call last): InvalidArgumentException: 400 Bad Request {'message': 'Invalid ShardIterator.', '__type': 'InvalidArgumentException'} - Conned121
您是在第一个分片还是后面的分片上得到它的?迭代器在5分钟后会失效。 - Mircea
这是在第一个分片上。我有两个测试流,一个有一个分片,另一个有两个分片。无论使用哪个流,我都会得到相同的错误。将分片迭代器打印到控制台上没有问题。 - Conned121
不是 shard_id,而是 shard 迭代器(在你的代码中为 shard_iterator)。id 是 shard 名称,迭代器是流中实际指针。没有迭代器,你无法获取记录。 - Mircea
是的,我已经弄清楚了。我将其更改为response = conn.get_records(shard_iterator, 100)。在这种情况下,错误消息是boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request {'Message': 'Start of structure or map found where not expected.', '__type': 'SerializationException'}。 - Conned121
已更新,请查看原问题。 - Conned121

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接