正如您在最近的评论中所总结的那样,firehose 中的 put_records_batch 要求您手动将分隔符放入记录中,以便消费者可以轻松解析。您可以添加一个新行或某些专用于解析的特殊字符,例如%,这些字符在有效负载中永远不应使用。
另一个选项是逐条发送记录。只有在您的用例不需要高吞吐量的情况下才可行。为此,您可以循环处理每个记录,并加载为字符串化的数据块。如果使用 Python,则我们将拥有一个名为“records”的字典,其中包含所有 json 对象。
import json
def send_to_firehose(records):
firehose_client = boto3.client('firehose')
for record in records:
data = json.dumps(record)
firehose_client.put_record(DeliveryStreamName=<your stream>,
Record={
'Data': data
}
)
默认情况下,Firehose会在将数据发送到您的存储桶之前对其进行缓冲,最终应该得到类似这样的结果。这将很容易解析并加载到您首选的数据结构中。
[
{
"metadata": {
"schema_id": "4096"
},
"payload": {
"zaza": 12,
"price": 20,
"message": "Testing sendnig the data in message attribute",
"source": "coming routing to firehose"
}
},
{
"metadata": {
"schema_id": "4096"
},
"payload": {
"zaza": 12,
"price": 20,
"message": "Testing sendnig the data in message attribute",
"source": "coming routing to firehose"
}
}
]
cloudwatch-logs > kinesis-firehose > s3
时遇到了相同的问题。有成功解决的吗? - lifeofguenter