使用AWS Glue或PySpark过滤DynamicFrame

3
我在AWS Glue数据目录中有一张名为“mytable”的表格。这个表格是通过连接到本地的Oracle数据库“mydb”而创建的。
我想要对结果进行过滤,只保留X_DATETIME_INSERT列(时间戳类型)大于给定时间(在这个例子中是'2018-05-07 04:00:00')的行。然后,我想要统计行数以确保计数较低(该表大约有40,000行,但只有少数行应符合筛选条件)。
以下是我的当前代码:
import boto3
from datetime import datetime
import logging
import os
import pg8000
import pytz
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from base64 import b64decode
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "mytable", transformation_ctx = "datasource0")

# Try Glue native filtering    
filtered_df = Filter.apply(frame = datasource0, f = lambda x: x["X_DATETIME_INSERT"] > '2018-05-07 04:00:00')
filtered_df.count()

这段代码运行了20分钟,然后超时了。我尝试了其他变化:

df = datasource0.toDF()
df.where(df.X_DATETIME_INSERT > '2018-05-07 04:00:00').collect()

And

df.filter(df["X_DATETIME_INSERT"].gt(lit("'2018-05-07 04:00:00'")))

执行失败了,我哪里出错了?我对Python很熟悉,但是在Glue和PySpark方面还是新手。

1个回答

4
AWS Glue 将整个数据集从 JDBC 源加载到临时 S3 文件夹中,然后再进行筛选。如果您的数据存储在 s3 中并按某些键(例如 /year/month/day)进行分区,则可以使用 pushdown-predicate 功能来加载数据子集。详情请参见 此处
val partitionPredicate = s"to_date(concat(year, '-', month, '-', day)) BETWEEN '${fromDate}' AND '${toDate}'"

val df = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

不幸的是,这对JDBC数据源尚不起作用。


请问您能否提供相关的支持文档呢? - Infinite

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接