AWS Glue ETL和PySpark与分区数据:如何从分区创建dataframe列

4

我有一些数据存储在S3桶中,包含许多类似于以下内容的json文件:

s3://bucket1/news/year=2018/month=01/day=01/hour=xx/

day 分区包含多个 hour=xx 分区,每个分区代表一天中的一个小时。我在 day 分区上运行 Glue ETL 作业,并创建了一个 Glue dynamic_frame_from_options。然后,我使用 ApplyMapping.apply 进行一些映射,效果非常好。

但是,我希望基于每个文件的分区创建一个新列,其中包含 hour 值。我可以使用 Spark 创建一个包含常量的新列,但是如何使此列使用分区作为源呢?

df1 = dynamicFrame.toDF().withColumn("update_date", lit("new column value"))

编辑1

AWS的一篇文章介绍了如何使用分区数据,其中在创建dynamicFrame之前使用了Glue爬虫,并从Glue目录中创建dynamicFrame。我需要直接从S3源创建dynamicFrame。 输入链接说明


https://aws.amazon.com/blogs/big-data/work-with-partitioned-data-in-aws-glue/ - Foxan Ng
谢谢,我已经看到这篇文章了。然而,在创建 dynamicFrame 之前,他们使用了 Glue 爬虫,然后从 Glue 目录创建了 dynamicFrame。我需要直接从 S3 源创建 dynamicFrame - Cactus
4个回答

3

我不太明白你需要做什么。如果你已经将文件分区,难道就没有一个hour值吗?或者只有当你使用create_dynamic_frame.from_catalog时才能得到它吗? 你可以使用df1["hour"]df1.select_fields["hour"]吗?

如果你的数据是按照ts(格式为yyyymmddhh的时间戳)进行分区,那么你不需要导入任何库,可以在Spark中使用纯Python完成这个操作。

示例代码。首先我创建了一些值来填充我的DataFrame。 然后创建一个如下的新变量。

df_values = [('2019010120',1),('2019010121',2),('2019010122',3),('2019010123',4)]
df = spark.createDataFrame(df_values,['yyyymmddhh','some_other_values'])
df_new = df.withColumn("hour", df["yyyymmddhh"][9:10])
df_new.show()
+----------+-----------------+----+
|yyyymmddhh|some_other_values|hour|
+----------+-----------------+----+
|2019010120|                1|  20|
|2019010121|                2|  21|
|2019010122|                3|  22|
|2019010123|                4|  23|
+----------+-----------------+----+

很不幸,这并不起作用,因为从选项中读取时,“hour”列为空。这就是我的问题。 - Cactus
你想用什么填充hour?哪个小时? - Developer

0

我对AWS Glue不太熟悉,如果给定的链接对你的情况无效,那么你可以尝试以下解决方法:

使用input_file_name获取文件名,然后使用regexp_extract从文件名中获取你想要的分区列:

from pyspark.sql.functions import input_file_name, regexp_extract

df2 = df1.withColumn("hour", regexp_extract(input_file_name(), "hour=(.+?)/", 1))

这个方法很好,但是只有在使用create_dynamic_frame.from_catalog函数创建动态框架时才能使用input_file_name参数。我需要使用create_dynamic_frame_from_options从S3数据创建动态框架。谢谢。 - Cactus

0
根据我的理解,您想要为给定日期构建数据框,并以小时作为分区。通常,如果您使用类似Apache Hive的分区路径,并且您的文件具有相同的架构,则使用起来不应该有问题。
ds = glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': ['s3://bucket1/news/year=2018/month=01/day=01/']},
    'json')

或者...

df = spark.read.option("mergeSchema", "true").json('s3://bucket1/news/year=2018/month=01/day=01/')

如果它不能工作,您应该检查是否使用了Apache Hive风格的分区路径并且您的文件具有相同的架构。

您还可以尝试在Glue中使用boto3框架(可能对您有用):

import boto3
s3 = boto3.resource('s3')

有用的链接:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html


这看起来很不错。但是,我需要path内部的分区['s3://bucket1/news/year=2018'],因此我使用了recursive=true标志。然后,分区不会出现在数据框中。 - Cactus

0
"...AWS Glue不会在DynamicFrame中包含分区列,它只包含数据。"
我们需要将S3键加载到新列中,并通过编程方式解码分区以创建我们想要的列到Dynamic Frame/Data Frame中。一旦创建完成,我们可以根据需要使用它们。
附:我已经对parquet文件进行了测试。它不适用于JSON文件。 参考

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接