通过Amazon Kinesis Firehose流写入S3的数据如何读取?

33

我正在将记录写入Kinesis Firehose流,最终由Amazon Kinesis Firehose写入S3文件。

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}

写入 S3 的数据如下:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

没有逗号分隔。

没有起始括号,就像 Json 数组一样。

[

没有结束括号,就像一个Json数组

]

我想要阅读这些数据并获取一个ItemPurchase对象列表。

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

如何正确地阅读这些数据?


Kinesis以这种奇怪的格式传递数据,以便可以通过Athena进行查询链接。奇怪的是,为什么他们一开始就固定了这种格式。 - sjain24
13个回答

27

令人难以置信的是,Amazon Firehose以这种方式将JSON消息转储到S3中,并且不允许您设置分隔符或任何其他选项。

最终,我发现解决这个问题的方法是使用JSON raw_decode方法处理文本文件。

这将允许您在记录之间没有分隔符的情况下读取一堆串联的JSON记录。

Python代码:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1

有点担心这个解决方案的时间复杂度。 - João Pedro Schmitt

6
我也遇到了同样的问题,以下是我的解决方法。
  1. replace "}{" with "}\n{"
  2. line split by "\n".

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    
一个嵌套的json对象有几个“}”,所以按“}”分割行并不能解决问题。

4
我考虑过做类似的事情,但我认为如果 JSON 对象中的字符串之一恰好包含“}{”,那么这种技术将失效。也许,如果您遍历每个字符,在遇到“(表示进入或退出字符串)时切换布尔值,计算您所在对象的级数(在看到字符串外的"{ "增加,在看到字符串外的"}"减少),然后当级别计数器再次达到0时,考虑对象的结尾。 - Krenair
1
分隔符}{存在问题,因为内部字符串可能包含像这样的json:}{\"(带转义引号),因此使用}{"作为分隔符会更好一些,因为内部字符串不能有引号。 - Eran H.
为了在Eran的答案基础上进行改进,我使用了负向先行断言来处理字符串末尾出现}{的情况:re.sub('}{"(?![,}])', '}\n{"', string) - twhyte

3
如果火箭管的输入源是一个分析应用程序,那么这个没有分隔符的串联JSON是一个已知的问题,正如这里所引用的。您应该像这里一样拥有一个Lambda函数,输出多行JSON对象。

3

我曾遇到相同的问题。

如果AWS允许我们设置分隔符会更好,但我们可以自行操作。

在我的使用场景中,我一直在监听推特流,一旦收到新的推特,我就立即将其放入Firehose

当然,这导致了一个无法解析的单行文件。

因此,为了解决这个问题,我将推特的JSON与\n连接起来。

这样,我就能够使用一些包来输出读取流内容时的行,并轻松解析文件。

希望这可以帮助你。


3
我认为解决这个问题的最佳方法是首先创建一个格式正确、包含良好分离的JSON对象的JSON文件。在我的情况下,我在事件之间添加了逗号,并将其推送到Firehose中。然后,在文件保存到S3后,所有文件都将包含由某些分隔符(在我们的情况下为逗号)分隔的JSON对象。还必须添加"["和"]"以及文件开头和结尾。现在,您就可以拥有一个包含多个JSON对象的正确的JSON文件,可以对它们进行解析了。

这适用于JSON,但不适用于更复杂的标记语言,如XML。 如果每个记录都是一个XML文档,则需要解析它们并将根元素包装到新的XML文档中,并添加某种封闭元素(我使用了<array></array>)。我目前正在尝试弄清楚如何以这种方式从S3读取。 - Martynas Jusevičius
如果您有多个生产者向Firehose发送有效的JSON数组,则此方法将无效。 - alextsil

3
我使用了一个转换 Lambda 在每条记录的末尾添加了一个换行符。
def lambda_handler(event, context):
    output = []

    for record in event['records']:

        # Decode from base64 (Firehose records are base64 encoded)
        payload = base64.b64decode(record['data'])

        # Read json as utf-8    
        json_string = payload.decode("utf-8")

        # Add a line break
        output_json_with_line_break = json_string + "\n"

        # Encode the data
        encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
        encoded_string = str(encoded_bytes, 'utf-8')

        # Create a deep copy of the record and append to output with transformed data
        output_record = copy.deepcopy(record)
        output_record['data'] = encoded_string
        output_record['result'] = 'Ok'

        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

2
使用这段简单的 Python 代码。
input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}'''

data_str = "[{}]".format(input_str.replace("}{","},{"))
data_json = json.loads(data_str)

然后(如果您想要的话)转换为Pandas。原始答案翻译成"最初的回答"。
import pandas as pd   
df = pd.DataFrame().from_records(data_json)
print(df)

最初的回答

这是结果

itemId personId
0  i-111    p-111
1  i-222    p-222
2  i-333    p-333

1
如果有改变数据写入方式的方法,请用一行分隔所有记录,这样你就可以简单地逐行读取数据。如果没有,请建立一个扫描器对象,以“}”作为分隔符,并使用该扫描器进行读取。这样就可以完成任务。

1
您可以通过计算括号数量找到每个有效的JSON。假设文件以{开头,此Python片段应该有效:
import json

def read_block(stream):
    open_brackets = 0
    block = ''
    while True:
        c = stream.read(1)
        if not c:
            break

        if c == '{':
            open_brackets += 1
        elif c == '}':
            open_brackets -= 1

        block += c

        if open_brackets == 0:
            yield block
            block = ''


if __name__ == "__main__":
    c = 0
    with open('firehose_json_blob', 'r') as f:
        for block in read_block(f):
            record = json.loads(block)
            print(record)

2
警告:这只是一个盲目的流读取器,如果任何JSON块中包含已转义的括号字符串,则会出现错误。 - Tom Chapin

1
这个问题可以通过一个从流中逐个消耗对象的JSON解析器来解决。JSONDecoder的raw_decode方法就暴露了这样一个解析器,但我已经写了一个库,使得只需一行代码即可轻松实现这一点。请参考我的
from firehose_sipper import sip

for entry in sip(bucket=..., key=...):
    do_something_with(entry)

我在这篇博客文章中添加了更多细节。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接