我有一些数据每天会被倾倒到 s3://<bucket>/mydata/year=*/month=*/*.snappy.parquet,作为该月的累积数据。我有一个爬虫用于更新 mydata 表格,并且有一个 CW 规则,当爬虫成功时调用 Lambda 启动 Glue 作业,将列进行转换并输出到 s3://<bucket>/mydata-transformed/year=*/month=*/*.snappy.parquet。这个流程基本上是有效的。然而,我目前遇到的问题是输出数据被累加写入,而不是替换原有数据(因为它是该月的累积数据)。例如,假设在2020年10月1日午夜,数据被倾倒到 s3://<bucket>/mydata/year=2020/month=10/*.snappy.parquet。该流程将在 s3://<bucket>/mydata-transformed/year=2020/month=10/*.snappy.parquet 中生成转换后的数据,对于10/1的数据来说一切都很好。然而,第二天当10/1和10/2的数据被倾倒到 s3://<bucket>/mydata/year=2020/month=10/*.snappy.parquet(覆盖了前一天的文件)时,Glue 作业将在输出文件夹中生成累加数据,即它将包含昨天的运行数据以及今天的运行数据(因此10/1的数据会出现两次,而10/2的数据则只有一次)。第二天,10/1的数据会出现3次,10/2的数据则出现2次,10/3的数据则出现1次。2020年09月及之前的数据是没有问题的,因为它们没有更改。以下是我的代码基本结构,省略了样板代码并用虚构代码替换了真实的转换代码。
我该怎么做才能使得在当前月份,前一天的数据被删除并用当前任务的数据替换呢?有没有办法知道,例如,在我的例子中,源数据中的分区month=10已经改变,因此我可以在进行转换和输出新数据之前清除相同的分区?
谢谢。
[编辑] 所以看起来一个解决方案是获取作业书签,然后使用CURR_LATEST_PARTITIONS值确定应该在处理数据之前删除哪个分区。在我的情况下,当我处理2020/10时,CURR_LATEST_PARTITIONS为2020/09。因此,我知道要删除2020/10的数据,因为如果CURR_LATEST_PARTITIONS为2020/09,则必须处理该数据。我不太喜欢这个解决方案,但我认为它会起作用,而且我不确定我还能做什么。
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1, glueContext, "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://<bucket>/mydata-transformed/", "partitionKeys": ["year","month"]}, transformation_ctx = "DataSink0")
job.commit()
我该怎么做才能使得在当前月份,前一天的数据被删除并用当前任务的数据替换呢?有没有办法知道,例如,在我的例子中,源数据中的分区month=10已经改变,因此我可以在进行转换和输出新数据之前清除相同的分区?
谢谢。
[编辑] 所以看起来一个解决方案是获取作业书签,然后使用CURR_LATEST_PARTITIONS值确定应该在处理数据之前删除哪个分区。在我的情况下,当我处理2020/10时,CURR_LATEST_PARTITIONS为2020/09。因此,我知道要删除2020/10的数据,因为如果CURR_LATEST_PARTITIONS为2020/09,则必须处理该数据。我不太喜欢这个解决方案,但我认为它会起作用,而且我不确定我还能做什么。