火箭筒JSON -> S3 Parquet -> ETL Spark,错误:无法推断Parquet的模式

3
似乎这应该很容易,就像这组功能的核心用例一样,但问题接踵而至。最新的问题是尝试通过Glue Dev端点运行命令(包括PySpark和Scala端点)。按照此处的说明进行操作:https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="mytable")

生成此错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/dynamicframe.py", line 557, in from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/context.py", line 136, in create_dynamic_frame_from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/data_source.py", line 36, in getFrame
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

它还会在其中一行设置中生成此警告:
18/06/26 19:09:35 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

总体设置相当简单:我们有一个入站的Kinesis数据流,一个处理器来生成JSON Kinesis数据流,一个配置为将该JSON流写入S3中Parquet文件的Kinesis firehose流,以及必要的Glue目录配置使其发生。
Athena可以很好地查看数据,但Scala / PySpark脚本会出错。
有什么想法/建议吗?
1个回答

3

好的,仍然不清楚为什么会发生这种情况,但是找到了解决方法!

基本上,不要使用生成的代码:

val datasource0 = glueContext.getCatalogSource(
        database = "db",
        tableName = "myTable",
        redshiftTmpDir = "",
        transformationContext = "datasource0"
    ).getDynamicFrame()

请使用以下代码:
val crawledData = glueContext.getSourceWithFormat(
        connectionType = "s3",
        options = JsonOptions(s"""{"path": "s3://mybucket/mytable/*/*/*/*/"}"""),
        format = "parquet",
        transformationContext = "source"
    ).getDynamicFrame()

这里的关键是 */*/*/*/ - 如果我只指定根文件夹,我会得到 Parquet 错误,并且(显然)正常的 /**/* 通配符不起作用。


我猜这完全忽略了粘合目录表,对吗?你知道它是否支持排除文件吗? - 4knahs
是的,完全忽略它。我们实际上正在直接使用Spark(仅在Glue作业中),而根本不需要通过Glue Context。不知道它是否支持文件排除。 - Narfanator
谢谢你的回答!你有没有遇到过S3令牌在长时间作业中过期导致临时存储桶出现问题的情况?如果有,我可以为你创建一个新的SO问题来回答。 - 4knahs
我们没有使用临时存储桶,或者至少没有过期的存储桶。保留所有中间产品以进行调试和恢复非常有用。整个过程不必一次性成功! - Narfanator

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接