向现有的Parquet文件追加一个新列

14
有没有办法向现有的parquet文件中追加新列?
我目前正在参加kaggle比赛,将所有数据转换为parquet文件。
这是情况,我将parquet文件读入pyspark DataFrame,并进行了一些特征提取,并使用pysaprk.DataFrame.withColumn()向DataFrame添加了新列。
之后,我想将新列保存到源parquet文件中。
我知道Spark SQL附带Parquet schema evolution,但示例只显示了一个键值的情况。 parquet“append”模式也行不通。它只会将新行附加到parquet文件中。 是否有任何方法可以将新列追加到现有的parquet文件中,而不是再次生成整个表格? 或者我必须生成一个单独的新parquet文件,并在运行时将它们连接起来。

如果从架构上看,向现有的Parquet文件追加新列是不可能的。这就像在玩弄Parquet文件的元数据。 - Aviral Kumar
虽然你可以尝试通过首先更改模式来重写它,但我不太确定在spark-sql中会发生什么。 - Aviral Kumar
是的,在spark-sql中更改模式很容易,但覆盖整个parquet文件很耗费时间,这意味着我必须重新计算整个表。感谢您的评论,@AviralKumar。 - Chu-Yu Hsu
3个回答

7
在 Parquet 中,您不能修改文件,而是需要读取它们、修改它们并将它们写回,您不能只更改一列,需要读取并写入整个文件。

6
尽管这个问题已经发布了2年,但仍然没有答案。让我自己回答这个问题。
在我使用Spark的时候,版本是1.4。对于新版本,我不确定,但是对于那个版本,在Parquet文件中添加一个新列是不可能的。

关于Spark版本2.4.4怎么样?我正在使用Pyspark创建Parquet文件。现在我需要在现有的Parquet文件中添加一列。我该如何做到这一点? - Vikram Ranabhatt

2

使用 Databricks Deltaparquet 表格都可以实现。以下是一个示例:

这个示例是用 python (pySpark)编写的。

df = sqlContext.createDataFrame([('1','Name_1','Address_1'),('2','Name_2','Address_2'),('3','Name_3','Address_3')], schema=['ID', 'Name', 'Address'])

delta_tblNm = 'testDeltaSchema.test_delta_tbl'
parquet_tblNm = 'testParquetSchema.test_parquet_tbl'

delta_write_loc = 'dbfs:///mnt/datalake/stg/delta_tblNm'
parquet_write_loc = 'dbfs:///mnt/datalake/stg/parquet_tblNm'


# DELTA TABLE
df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(delta_write_loc)
spark.sql(" create table if not exists {} using DELTA LOCATION '{}'".format(delta_tblNm, delta_write_loc))
spark.sql("refresh table {}".format(print(cur_tblNm)))

# PARQUET TABLE
df.write.format("parquet").mode("overwrite").save(parquet_write_loc)
spark.sql("""CREATE TABLE if not exists {} USING PARQUET LOCATION '{}'""".format(parquet_tblNm, parquet_write_loc))
spark.sql(""" REFRESH TABLE {} """.format(parquet_tblNm))

test_df = spark.sql("select * testDeltaSchema.test_delta_tbl")
test_df.show()

test_df = spark.sql("select * from testParquetSchema.test_parquet_tbl")
test_df.show()

test_df = spark.sql("ALTER TABLE  testDeltaSchema.test_delta_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()

test_df = spark.sql("ALTER TABLE  testParquetSchema.test_parquet_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接