如何使用AWS Glue作业覆盖过期的分区数据?

3
我有一些数据每天会被倾倒到 s3://<bucket>/mydata/year=*/month=*/*.snappy.parquet,作为该月的累积数据。我有一个爬虫用于更新 mydata 表格,并且有一个 CW 规则,当爬虫成功时调用 Lambda 启动 Glue 作业,将列进行转换并输出到 s3://<bucket>/mydata-transformed/year=*/month=*/*.snappy.parquet。这个流程基本上是有效的。然而,我目前遇到的问题是输出数据被累加写入,而不是替换原有数据(因为它是该月的累积数据)。例如,假设在2020年10月1日午夜,数据被倾倒到 s3://<bucket>/mydata/year=2020/month=10/*.snappy.parquet。该流程将在 s3://<bucket>/mydata-transformed/year=2020/month=10/*.snappy.parquet 中生成转换后的数据,对于10/1的数据来说一切都很好。然而,第二天当10/1和10/2的数据被倾倒到 s3://<bucket>/mydata/year=2020/month=10/*.snappy.parquet(覆盖了前一天的文件)时,Glue 作业将在输出文件夹中生成累加数据,即它将包含昨天的运行数据以及今天的运行数据(因此10/1的数据会出现两次,而10/2的数据则只有一次)。第二天,10/1的数据会出现3次,10/2的数据则出现2次,10/3的数据则出现1次。2020年09月及之前的数据是没有问题的,因为它们没有更改。以下是我的代码基本结构,省略了样板代码并用虚构代码替换了真实的转换代码。
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1, glueContext, "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://<bucket>/mydata-transformed/", "partitionKeys": ["year","month"]}, transformation_ctx = "DataSink0")
job.commit()

我该怎么做才能使得在当前月份,前一天的数据被删除并用当前任务的数据替换呢?有没有办法知道,例如,在我的例子中,源数据中的分区month=10已经改变,因此我可以在进行转换和输出新数据之前清除相同的分区?
谢谢。
[编辑] 所以看起来一个解决方案是获取作业书签,然后使用CURR_LATEST_PARTITIONS值确定应该在处理数据之前删除哪个分区。在我的情况下,当我处理2020/10时,CURR_LATEST_PARTITIONS为2020/09。因此,我知道要删除2020/10的数据,因为如果CURR_LATEST_PARTITIONS为2020/09,则必须处理该数据。我不太喜欢这个解决方案,但我认为它会起作用,而且我不确定我还能做什么。

3
由于我没有足够的声望来发表评论,所以在回答中添加内容以解决有关已接受答案中第三个选项purge_s3_path的评论中提出的问题。它可能无法正常工作,因为默认情况下“保留期”为7天,这意味着任何新于168小时的内容都不会被purge_s3_path删除。因此,如果您希望删除路径,则需要将保留期指定为零,如下所示:glueContext.purge_s3_path('s3://s3_path/bucket',options={"retentionPeriod":0})。 - Deepak Gaur
@Deepak Gaur,谢谢!你的答案起作用了。 - sevencontinents
@DeepakGaur 我尝试将保留期设置为0。脚本成功运行,但是S3路径中的文件并没有被删除。有什么原因吗? - prathik vijaykumar
3个回答

6

您有几个选项:

  1. DynamicFrameWriter 尚未支持在S3中覆盖数据。您可以使用Spark本机的 write()替代。但是,对于非常大的数据集来说,这可能效率有点低,因为只会使用一个工作节点来覆盖现有的S3数据。以下是一个示例:
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
    ds_df = DataSource0.toDF()
    ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
    ds_df1 \
        .write.mode('overwrite') \
        .format('parquet') \
        .partitionBy('year', 'month') \
        .save('s3://<bucket>/mydata-transformed/')
    
    job.commit()
  1. 在lambda函数中,您可以使用Python和boto3删除S3下特定前缀的数据。以下是一个示例:
    import boto3
    
    s3_res = boto3.resource('s3')
    bucket = 'my-bucket-name'
    # Add any logic to derive required prefix based on year/month/day
    prefix = 'mydata/year=2020/month=10/'
    s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
  1. 您可以使用 Glue 的 purge_s3_path 方法删除特定前缀下的数据。链接在这里

1
谢谢!我无法让purge_s3_path正常工作,但在此之后,我又添加了以下内容,第一个解决方案对我有用:spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") - sevencontinents
purge_s3_path()对我也不起作用。不知道为什么。 - Aseem
purge_s3_path 对我来说运行良好。您需要将保留期设置为零。 glueContext.purge_s3_path( f"s3://{args['s3_bucket']}/archive/resources/date={today}", options={"retentionPeriod": 0}) - thumper

3

您可以使用purge_s3_path。

请注意,它不会直接起作用,因为默认的“保留期”为7天,这意味着任何新于168小时的内容都不会被purge_s3_path删除。因此,如果您想要删除路径,需要将其保留期指定为零,如下所示:

glueContext.purge_s3_path('s3://s3_path/bucket', options={"retentionPeriod":0})

3
现在Glue中存在一个函数,可以删除S3路径或删除Glue目录表。 AWS Glue文档

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接