如何读取无效的JSON格式Amazon Firehose数据

4
我遇到了一个非常可怕的情况,我想读取Kinesis Firehose在我们的S3上创建的文件。
Kinesis Firehose创建的文件不是每个JSON对象都在新行上,而是简单地将JSON对象连接在一起。
现在,这种情况不支持普通的JSON.parse,并且我已经尝试使用以下正则表达式:.scan(/({((\".?\":.?)*?)})/)。
但似乎只有没有嵌套括号的情况下扫描才有效。
有人知道解决这个问题的有效/更好/更优雅的方法吗?

1
我认为一个重要的项目是我们从火管 s3 文件中读取数据,这些数据会被导入到 redshift 中。 - Spons
我相信我们发送的是一个字符串化的JSON对象。这也可能解释了S3文件中的反斜杠。嗯,我需要为此构建一个测试。 - Spons
好的,我猜我明白问题出在哪里了。可能是你将整个JSON对象转换为字符串,然后将其作为单个事件推送到Firehose。 - mowienay
和@spons一样,当使用cloudwatch-logs > kinesis-firehose > s3时遇到了相同的问题。有成功解决的吗? - lifeofguenter
@lifeofguenter 是的和不是的。我最终为这种用例构建了一个非常好的解析器。由于我们正在运行一个高负载的实时应用程序,而且还没有时间从零开始构建一个干净的设置,所以我们使用了现有的环境。 - Spons
显示剩余8条评论
3个回答

2
最初的回答中的那个是用于处理未引用的 JSON,有时会出现这种情况。而这个:

({((\\?\".*?\\?\")*?)})

适用于带引号和不带引号的 JSON。
除此之外,还进行了一些改进,使其更简单化。由于具有双重捕获组,因此将忽略字符串字面值中的任何内容,以便您可以拥有整数和普通值。

https://regex101.com/r/kPSc0i/1


1

将输入修改为一个大的JSON数组,然后解析它:

input = File.read("input.json")
json = "[#{input.rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)

你可能想要将前两个结合起来以节省一些内存:

json = "[#{File.read('input.json').rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)

这假设在你的JSON编码数据中,}后面跟着一些空格,然后是{,从未出现在键或值中。

很遗憾,它不是一个以换行符分隔的 JSON。而且它也不能被解析,因为它只是一串连接在一起的 JSON 对象字符串。AWS 喜欢遵循标准,是吧?不是哈哈。 - Spons
@spons 我已经更新了我的答案,并添加了一个变体,希望在你的情况下能够起作用。 - Lars Haugseth

0

正如您在最近的评论中所总结的那样,firehose 中的 put_records_batch 要求您手动将分隔符放入记录中,以便消费者可以轻松解析。您可以添加一个新行或某些专用于解析的特殊字符,例如%,这些字符在有效负载中永远不应使用。

另一个选项是逐条发送记录。只有在您的用例不需要高吞吐量的情况下才可行。为此,您可以循环处理每个记录,并加载为字符串化的数据块。如果使用 Python,则我们将拥有一个名为“records”的字典,其中包含所有 json 对象。

import json
def send_to_firehose(records):
  firehose_client = boto3.client('firehose')
  for record in records:
    data = json.dumps(record)
    firehose_client.put_record(DeliveryStreamName=<your stream>,
                               Record={
                                       'Data': data
                                      }
                              )

默认情况下,Firehose会在将数据发送到您的存储桶之前对其进行缓冲,最终应该得到类似这样的结果。这将很容易解析并加载到您首选的数据结构中。

[
    {
        "metadata": {
            "schema_id": "4096"
        },
        "payload": {
            "zaza": 12,
            "price": 20,
            "message": "Testing sendnig the data in message attribute",
            "source": "coming routing to firehose"
        }
    },
    {
        "metadata": {
            "schema_id": "4096"
        },
        "payload": {
            "zaza": 12,
            "price": 20,
            "message": "Testing sendnig the data in message attribute",
            "source": "coming routing to firehose"
        }
    }
]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接