Spark:将DataFrame写入压缩JSON

25

Apache Spark的DataFrameReader.json()可以自动处理压缩的JSONlines文件,但似乎没有办法让DataFrameWriter.json()写入压缩的JSONlines文件。在云中进行额外的网络I/O非常昂贵。

有没有解决这个问题的方法?


你发现了一种压缩JSON输出的方法吗?我也在寻找解决方案。 - Rob Cowie
我还没有发现一种方法来做到这一点。 - Sim
3个回答

34

使用Spark 2.X(以及可能更早版本,我没有测试)有一种更简单的方法来编写压缩JSON,而不需要改变配置:

val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")

这也适用于CSV和Parquet,只需在设置压缩选项后使用.csv()和.parquet()代替.json()来写入文件。

可能的编解码器有:none、bzip2、deflate、gzip、lz4和snappy。


它似乎无法与Spark 2.X之前的早期版本一起使用。 - Alessandro Cosentino

17

设置SparkConf的压缩选项不是一个好的做法,就像被接受的答案所说。它改变了行为全局而不是在每个文件上指示设置。事实是,显式总比隐式要好。还有一些情况下,用户无法轻易地操作上下文配置,例如spark-shell或作为另一个子模块设计的代码。

正确的方法

从Spark 1.4开始支持使用压缩编写DataFrame。有多种方法可以实现:

第一种

df.write.json("filename.json", compression="gzip")

就这样!只需随心所欲地使用DataFrameWriter.json()

魔法隐藏在代码pyspark/sql/readwriter.py中。

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
    """Saves the content of the :class:`DataFrame` in JSON format
    (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
    specified path.

    :param path: the path in any Hadoop supported file system
    :param mode: ...

    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, bzip2, gzip, lz4,
                        snappy and deflate).
    :param dateFormat: ...
    :param timestampFormat: ...

    >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
    """
    self.mode(mode)
    self._set_opts(
        compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
    self._jwrite.json(path)

支持的压缩格式包括bzip2,gzip,lz4,snappy和deflate,不区分大小写。

Scala API应该是相同的。

另一个

df.write.options(compression="gzip").json("filename.json")

与上面类似。更多选项可以作为关键字参数提供。自 Spark 1.4 以来可用。

第三个

df.write.option("compression", "gzip").json("filename.json")

DataFrameWriter.option()自Spark 1.5版本以后加入。每次只能添加一个参数。


15
以下解决方案使用pyspark,但我认为Scala代码会类似。
第一种选项是在初始化SparkConf时设置以下内容:
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

使用上面的代码,您使用该sparkContext生成的任何文件都将自动使用gzip压缩。

第二个选项是,如果您只想在上下文中压缩选定的文件。假设“df”是您的数据帧,而“filename”是您的目标:

df_rdd = self.df.toJSON() 
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

1
Scala RDD API 是 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]),因此代码类应该直接传递,而不是作为字符串传递。 - Sim
想知道在将数据存储到文件时是否有可能避免使用Hadoop格式。我不能使用带有“_SUCCES”和“part-*”文件的目录。我只需要一个特定命名的单个文件... - lisak
抱歉关于这个问题的再次提问,但我很难相信conf.set("spark.hadoop.mapred.output.compression.codec", "true")是必要的。 - oarsome
DataFrame不是RDD。全局更改压缩设置使其隐式化也不是一个好的实践方法。 - ttimasdf

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接