AWS Glue 到 Redshift:是否可以替换、更新或删除数据?

30
这是我设置的一些要点:
  • 我上传了CSV文件到S3,并设置了Glue爬虫来创建表格和架构。
  • 我设置了一个Glue作业,使用JDBC连接将数据从Glue表格写入我们的Amazon Redshift数据库。该作业还负责映射列并创建Redshift表格。
重新运行作业后,Redshift中会出现重复行(符合预期)。但是,是否有一种方法可以在插入新数据之前使用键或Glue中的分区设置替换或删除行?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()

2
好问题,我现在也遇到了同样的问题。你到目前为止有取得任何进展吗? - Matthijs
3
我联系了AWS Glue支持团队并得到了一个解决方法。看起来Glue没有一种方法来完成这个任务,或者从未被设计用于这种类型的工作。我找到的工作解决方法是让Glue将所有行插入到一个临时表中,然后在Glue之外执行upsert/merge操作来实现。 - krchun
6个回答

21

作业书签是关键。只需编辑作业并启用“作业书签”,它就不会处理已经处理过的数据。 请注意,作业必须重新运行一次,然后它才能检测到不必再次重新处理旧数据。

有关更多信息,请参见: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

在我看来,“书签”这个名称有点牵强。如果不是偶然在搜索过程中碰巧遇到它,我永远都不会注意到它。


1
我不太确定为什么你被踩了。工作书签相当于Spark中的检查点,这似乎是问题所在。 - Leyth G
2
我也不确定。我能想到的唯一原因是,重新运行相同的作业(例如清除书签)可能会导致 Redshift 中出现重复记录,因为批处理再次处理。 - Matthijs
3
你真的让它运行了吗?我知道它应该做你说的那样,但我无法让它正常工作。我的输入是一个目录表(由在S3上的Parquet数据集上爬虫创建),一个简单的映射步骤和Redshift作为数据汇。作业书签默认启用,并且所有作业运行也都启用了它,但每次运行仍会出现重复数据。 - andresp
2
是的,它对我有效。我有一个每天爬行的爬虫。然后使用选项--job-bookmark-option:job-bookmark-enable触发器(几个小时后,以便爬虫完成)。我们没有使用Parquet,不确定是否有所不同。总的来说,我对Glue的经验并不是很好:当作业过大时,作业会失败,我无法让自定义Python脚本工作。我们正在寻找替代方案。 - Matthijs
确实。我刚用JSON作为输入进行了测试,它可以正常工作。我已经向AWS报告了Parquet的错误。 - andresp
显示剩余3条评论

10
这是我从AWS Glue支持获得的解决方案:
如您所知,虽然可以创建主键,但Redshift不强制唯一性。因此,如果您重新运行Glue作业,则可能会插入重复行。保持唯一性的一些方法包括:
1. 使用临时表将所有行插入,然后执行upsert/merge [1]到主表中,这必须在glue之外完成。
2. 在您的Redshift表中添加另一列[1],例如插入时间戳,允许重复但知道哪一个先到或最后到,然后在需要时删除重复项。
3. 将先前插入的数据加载到数据帧中,然后将要插入的数据与数据进行比较,以避免插入重复项[3]。
[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.htmlhttp://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/ [2] - https://github.com/databricks/spark-redshift/issues/238 [3] - https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

你检查了作业书签吗?如果你的源是S3,那可能就足够了。如果对你来说不起作用,我想知道你遇到了什么问题,这样我就不会犯同样的错误了。 - Matthijs
1
我刚刚尝试使用Glue,我的数据源和数据目的地都在Amazon Redshift中。启用书签并没有帮助我。数据存在重复。 - deepak.prathapani
是的,书签对我们也不是答案。不过我们已经不再使用AWS Glue了,所以很遗憾这就是我所能提供的了。 - krchun

7
请查看答案。其中有关于如何使用暂存表将数据upsert到Redshift的解释和代码示例。同样的方法也可用于在Glue写入数据之前或之后运行任何SQL查询,可以使用preactionspostactions选项:
// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "preactions" -> "<another SQL queries>",
    "postactions" -> "<some SQL queries>"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)

5
今天我已经测试并找到了一个使用JDBC连接更新/删除目标表的解决方法。
我使用了以下代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pg8000
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'PW',
    'HOST',
    'USER',
    'DB'
])
# ...
# Create Spark & Glue context

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ...
config_port = ****
conn = pg8000.connect(
    database=args['DB'], 
    user=args['USER'], 
    password=args['PW'],
    host=args['HOST'],
    port=config_port
)
query = "UPDATE table .....;"

cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()



query1 = "DELETE  AAA FROM  AAA A, BBB B WHERE  A.id = B.id"

cur1 = conn.cursor()
cur1.execute(query1)
conn.commit()
cur1.close()
conn.close()

我会测试一下,你尝试过使用 psycopg2 而不是 pg8000 吗? - Kunal
是的。如所述,psycopg2尚不支持C编写。目前不支持诸如pandas之类的库,也不支持用其他语言编写的扩展。 - BigData-Guru
在Glue中使用pg8000,您是否需要包含任何外部库? - Kunal

0
根据我的测试(使用相同的场景),书签功能不起作用。当作业运行多次时,会插入重复数据。我通过每天从S3位置中删除文件(通过lambda)并实施分段和目标表来解决了这个问题。数据将根据匹配键列进行插入/更新。

0

2
虽然我认为你是对的,但你只是在重复我的答案 :) - Matthijs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接