如何在AWS Glue PySpark中运行并行线程?

7

我有一个Spark作业,它将从多个相同的表中提取数据进行转换。基本上是在一个表格列表上循环迭代,查询目录表添加时间戳,然后推入到Redshift(如下面的示例)。

这个作业大约需要30分钟才能完成。是否有办法在同一个spark/glue上下文中并行运行这些作业呢?如果可避免,我不想创建单独的glue作业。

import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *


# query the runtime arguments
args = getResolvedOptions(
    sys.argv,
    ["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)

# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()

tables = []

for table in tables:
    catalog_table = glueContext.create_dynamic_frame.from_catalog(
        database="test", table_name=table, transformation_ctx=table
    )
    data_set = catalog_table.toDF().withColumn(
        "batchLoadTimestamp", lit(job_execution_timestamp)
    )

    # covert back to glue dynamic frame
    export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")

    # remove null rows from dynamic frame
    non_null_records = DropNullFields.apply(
        frame=export_frame, transformation_ctx="non_null_records"
    )

    temp_dir = os.path.join(args["TempDir"], redshift_table_name)

    stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=non_null_records,
        catalog_connection=args["redshift_catalog_connection"],
        connection_options={
            "dbtable": f"{args['target_schema']}.{redshift_table_name}",
            "database": args["target_database"],
            "preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
        },
        redshift_tmp_dir=temp_dir,
        transformation_ctx="stores_redshiftSink",
    ) ```
1个回答

9

以下是加快此过程的方法:

  1. 启用作业的并发执行。
  2. 分配足够数量的 DPU。
  3. 将表列表作为参数传递。
  4. 使用 Glue 工作流程或步骤函数并行执行作业。

现在假设您有 100 张表需要摄取,您可以将列表分成每个包含 10 张表,并同时运行 10 次以并发方式运行作业。

由于数据将被并行加载,因此 Glue 作业运行时间将减少,从而产生更少的费用。

更快的备选方法是使用 Redshift 实用程序直接进行操作。

  1. 在 Redshift 中创建表并将 batchLoadTimestamp 列保留为当前时间戳的默认值。
  2. 现在创建复制命令并直接从 S3 加载数据到表中。
  3. 使用 Glue python shell job 运行 copy 命令,利用 pg8000。

为什么这种方法会更快呢?因为 Spark Redshift JDBC 连接器首先将 Spark DataFrame 卸载到 S3,然后准备一个复制命令来卸载到 Redshift 表。直接运行复制命令时,您将消除运行卸载命令和读取数据到 Spark DataFrame 的开销。


1
这两种方法都非常适合我的使用情况。谢谢! - sewardth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接