如何在Spark 2中解压LZ4 JSON

3

我从https://censys.io/下载了一个xxxx.json.lz4文件,但是当我试图使用以下代码读取文件时,输出数据为0。

metadata_lz4 = spark.read.json("s3n://file.json.lz4")

虽然手动解压缩正常并且可以导入Spark,但它不返回任何结果。

我也尝试过

val metadata_lz4_2 = spark.sparkContext.newAPIHadoopFile("s3n://file.json.lz4", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])

同样也没有返回结果。

我有多个这样的文件,每个文件大小为100GBs,因此非常希望不必手动解压每个文件。

有什么好的想法吗?


虽然手动解压工作正常,但你能告诉我如何手动操作吗? - mjbsgll
2个回答

3
根据这个未解决的问题,Spark LZ4解压缩器使用的规范与标准LZ4解压缩器不同。因此,在apache-spark中解决此问题之前,您将无法使用Spark LZ4来解压缩标准LZ4压缩文件。

我认为我们的Lz4Codec实现在创建基于文本的文件时实际上没有使用FRAME规范(http://cyan4973.github.io/lz4/lz4_Frame_format.html)。看起来它是作为一种编解码器添加到块压缩格式(如SequenceFiles/HFiles等)中使用的,但并非面向文本文件,或者是在LZ4没有FRAME规范的时候引入的。

因此,从根本上讲,我们与lz4实用程序不兼容。这种差异类似于GPLExtras的LzoCodec与LzopCodec之间的差异,前者仅是数据压缩算法,而后者是实际的帧格式,可与lzop CLI实用程序互操作。

为了使自己能够互操作,我们需要引入一个新的框架包装编解码器,例如LZ4FrameCodec,当用户想要解压缩或压缩由lz4/lz4cat CLI实用程序生成/可读的文本数据时,他们可以使用该编解码器。


1
我成功地在Pyspark中实现了对lz4压缩的解析,方法如下:

import lz4.frame
import pyspark.sql.functions as F
from pyspark.sql import SparkSession


spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

list_paths = ['/my/file.json.lz4', '/my/beautiful/file.json.lz4']

rdd = sc.binaryFiles(','.join(list_paths))
df = rdd.map(lambda x: lz4.frame.decompress(x[1])).map(lambda x: str(x)).map(lambda x:  (x, )).toDF()

对于非复杂对象,这通常已经足够了。但如果您解析的压缩JSON具有嵌套结构,则需要在调用函数F.from_json()之前对解析文件进行额外清理:

schema = spark.read.json("/my/uncompressed_file.json").schema

df = df.select(F.regexp_replace(F.regexp_replace(F.regexp_replace(F.regexp_replace(F.regexp_replace("_1", "None", "null"), "False", "false"), "True", "true"), "b'", ""), "'", "").alias("json_notation"))
result_df = df.select(F.from_json("json_notation", schema)

其中/my/uncompressed_file.json是您之前解压缩的/my/file.json.lz4(除非您想手动提供架构,如果不太复杂,它仍然可以工作)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接