我成功地在Pyspark中实现了对lz4压缩的解析,方法如下:
import lz4.frame
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext
list_paths = ['/my/file.json.lz4', '/my/beautiful/file.json.lz4']
rdd = sc.binaryFiles(','.join(list_paths))
df = rdd.map(lambda x: lz4.frame.decompress(x[1])).map(lambda x: str(x)).map(lambda x: (x, )).toDF()
对于非复杂对象,这通常已经足够了。但如果您解析的压缩JSON具有嵌套结构,则需要在调用函数F.from_json()
之前对解析文件进行额外清理:
schema = spark.read.json("/my/uncompressed_file.json").schema
df = df.select(F.regexp_replace(F.regexp_replace(F.regexp_replace(F.regexp_replace(F.regexp_replace("_1", "None", "null"), "False", "false"), "True", "true"), "b'", ""), "'", "").alias("json_notation"))
result_df = df.select(F.from_json("json_notation", schema)
其中/my/uncompressed_file.json
是您之前解压缩的/my/file.json.lz4
(除非您想手动提供架构,如果不太复杂,它仍然可以工作)。