使用Delta编码列,编写一个Parquet文件。

4
我试图使用Delta编码将Parquet文件写入。 此页面指出,Parquet支持三种类型的Delta编码:
    (DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).

由于 sparkpyspark 或者 pyarrow 不允许我们指定编码方式,我很好奇如何启用带有delta编码的文件写入?

然而,我在互联网上发现,如果我有TimeStamp类型的列,parquet将使用delta编码。因此,我在scala中使用以下代码创建了一个parquet文件。但是编码不是delta。


    val df = Seq(("2018-05-01"),
                ("2018-05-02"),
                ("2018-05-03"),
                ("2018-05-04"),
                ("2018-05-05"),
                ("2018-05-06"),
                ("2018-05-07"),
                ("2018-05-08"),
                ("2018-05-09"),
                ("2018-05-10")
            ).toDF("Id")
    val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
    val df3 = df2.withColumn("Date", (col("Id").cast("date")))

    df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
< p > parquet-tools 显示了关于已写入的 parquet 文件的以下信息。 < /p >
file schema: spark_schema 
--------------------------------------------------------------------------------
Id:          OPTIONAL BINARY L:STRING R:0 D:1
Timestamp:   OPTIONAL INT96 R:0 D:1
Date:        OPTIONAL INT32 L:DATE R:0 D:1

row group 1: RC:31 TS:1100 OFFSET:4 
--------------------------------------------------------------------------------
Id:           BINARY SNAPPY DO:0 FPO:4 SZ:230/487/2.12 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
Timestamp:    INT96 SNAPPY DO:0 FPO:234 SZ:212/436/2.06 VC:31 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
Date:         INT32 SNAPPY DO:0 FPO:446 SZ:181/177/0.98 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]

可以看到,没有任何列使用了增量编码。

我的问题是:

  1. 如何使用增量编码写入parquet文件?(如果您能提供一个示例代码,使用scalapython编写将会非常好。)

  2. 如何决定使用哪种“增量编码”:(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY)

1个回答

7

使用PySpark生成parquet文件时启用DELTA编码真的很具有挑战性。

我们生成了大量的数字数据,使用DELTA编码可以获得很大的好处。在我的测试中,我能够将一个小的测试文件(136.9MB)使用DELTA编码减少到101.6MB。对于我们的用例,我们生成TB级别的数据,因此未来的S3节省值得考虑。

我的经验是使用Spark 2.4.5和EMR 5.29.0。在生成DELTA编码文件之前和之后,我遇到了许多问题。我将提到它们,以便您意识到这些问题并避免犯错。

为了在PySpark中生成DELTA编码parquet文件,我们需要启用Parquet write的第2个版本。这是唯一可行的方法。此外,由于某种原因,该设置仅在创建Spark上下文时起作用。设置为:

"spark.hadoop.parquet.writer.version": "v2"

结果如下:

time: INT64 GZIP DO:0 FPO:11688 SZ:84010/2858560/34.03 VC:15043098 ENC:DELTA_BINARY_PACKED ST:[min: 1577715561210, max: 1577839907009, num_nulls: 0]

然而,如果您直接在PySpark中读取相同的文件,您将会得到以下错误:java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED

为了能够读取该文件,您需要禁用以下配置:spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

另外,令人遗憾的是,截至本文撰写时,Pandas无法读取这些文件。

Parquet files v2.0 created by spark can't be read by pyarrow


谢谢@charu。抱歉回复晚了。这个可行。按照您的指示,我设置了“spark.hadoop.parquet.writer.version”:“v2”。- Omega Gamage - Omega Gamage
fastparquet 正在 https://github.com/dask/fastparquet/pull/602 中添加 DELTA 读取支持。它已经成功处理来自 https://github.com/apache/parquet-testing/tree/master/data 的 datapage_v2.snappy.parquet 数据。 - mdurant

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接