如何在pyspark dataframe的读取方法中包含分区列

3
我正在从Parquet文件中编写基于Avro的文件。以下是我读取文件的代码:

读取数据

dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
    .load("/Users/rashmik/flight-time.parquet")

写入数据

我已经按照以下方式以Avro格式编写了文件:

dfParquetRePartitioned.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "datasink/avro") \
    .partitionBy("OP_CARRIER") \
    .option("maxRecordsPerFile", 100000) \
    .save()

作为预期结果,我的数据已按OP_CARRIER分区。

从特定分区读取Avro分区数据

在另一个作业中,我需要从上述作业的输出中读取数据,即从datasink/avro目录中读取。我正在使用以下代码从datasink/avro中读取:

dfAvro = spark.read.format("avro") \
    .option("mode","FAILFAST") \
    .load("datasink/avro/OP_CARRIER=AA")

它成功读取数据,但是如预期,在dfAvro数据帧中没有可用的OP_CARRIER列,因为它是第一个作业的分区列。现在我的要求是在第二个数据帧即dfAvro中也包括OP_CARRIER字段。有人可以帮我吗? 我正在参考Spark文档的文档,但我无法找到相关信息。任何指针都将非常有帮助。

.load("datasink/avro") - Lamanus
1个回答

0

您可以使用不同的别名复制相同的列值。

dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) \
.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()

这将给你想要的东西。但是使用不同的别名。 或者您也可以在读取期间执行此操作。如果位置是动态的,则可以轻松地追加列。

path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=") 
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load(path).withColumn(newcol[0], lit(newcol[1]))

如果该值是静态的,在数据读取期间添加它要容易得多。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接