通过Amazon Kinesis Firehose流写入S3的数据如何读取?

33

我正在将记录写入Kinesis Firehose流,最终由Amazon Kinesis Firehose写入S3文件。

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}

写入 S3 的数据如下:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

没有逗号分隔。

没有起始括号,就像 Json 数组一样。

[

没有结束括号,就像一个Json数组

]

我想要阅读这些数据并获取一个ItemPurchase对象列表。

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

如何正确地阅读这些数据?


Kinesis以这种奇怪的格式传递数据,以便可以通过Athena进行查询链接。奇怪的是,为什么他们一开始就固定了这种格式。 - sjain24
13个回答

0
您可以使用以下脚本。
如果流式数据大小未超过您设置的缓冲区大小,则每个S3文件都有一对方括号([])和逗号。
import base64

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')+',\n'

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8'))
        }
        output.append(output_record)

    last = len(event['records'])-1
    print('Successfully processed {} records.'.format(len(event['records'])))
    
    start = '['+base64.b64decode(output[0]['data']).decode('utf-8')
    end = base64.b64decode(output[last]['data']).decode('utf-8')+']'
    
    output[0]['data'] = base64.b64encode(start.encode('utf-8'))
    output[last]['data'] = base64.b64encode(end.encode('utf-8'))
    return {'records': output}


0
在Spark中,我们遇到了同样的问题。我们正在使用以下内容:
from pyspark.sql.functions import *

@udf
def concatenated_json_to_array(text):
  final = "["
  separator = ""
  
  for part in text.split("}{"):
    final += separator + part
    separator = "}{" if re.search(r':\s*"([^"]|(\\"))*$', final) else "},{"
      
  return final + "]"


def read_concatenated_json(path, schema):
  return (spark.read
          .option("lineSep", None)
          .text(path)
          .withColumn("value", concatenated_json_to_array("value"))
          .withColumn("value", from_json("value", schema))
          .withColumn("value", explode("value"))
          .select("value.*"))  

它的工作原理如下:

  1. 每个文件将数据读取为一个字符串(没有分隔符!)
  2. 使用UDF引入JSON数组,并通过引入逗号拆分JSON对象。注意:要小心不要破坏其中任何带有}{的字符串!
  3. 使用模式解析JSON到DataFrame字段中。
  4. 将数组展开为单独的行。
  5. 将值对象扩展到列中。

像这样使用它:

from pyspark.sql.types import *

schema = ArrayType(
  StructType([
    StructField("type", StringType(), True),
    StructField("value", StructType([
      StructField("id", IntegerType(), True),
      StructField("joke", StringType(), True),
      StructField("categories", ArrayType(StringType()), True)  
    ]), True)
  ])
)

path = '/mnt/my_bucket_name/messages/*/*/*/*/'
df = read_concatenated_json(path, schema)

我在这里写了更多的细节和注意事项:使用Spark解析来自S3(Kinesis)的JSON数据。不要仅仅通过}{进行分割,因为它可能会破坏你的字符串数据!例如:{ "line": "a\"r}{t" }


0

使用 JavaScript 正则表达式。

JSON.parse(`[${item.replace(/}\s*{/g, '},{')}]`);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接