如何在不扫描数据的情况下覆盖pyspark DataFrame模式?

4
这个问题与https://dev59.com/F5bfa4cB1Zd3GeqPzNXQ#37090151有关。假设我有一个pyspark DataFrame,具有特定的模式,并且我想使用我知道兼容的新模式覆盖该模式,我可以执行以下操作:
df: DataFrame
new_schema = ...

df.rdd.toDF(schema=new_schema)

很不幸,这会触发与上面链接描述相同的计算。有没有一种在元数据级别(或延迟)执行而不会急切地触发计算或转换的方法?

编辑,注意:

  • 架构可以任意复杂(嵌套等)
  • 新架构包括对描述、可空性和其他元数据的更新(如果更新了类型,则得分更高)
  • 我想避免编写自定义查询表达式生成器,除非在Spark中已经内置了一个可以根据模式/StructType生成查询的查询表达式生成器

你想要改变多少?它可以通过在特定属性上使用“cast”或“F.struct”来实现吗? - Emma
@Emma,感谢您的评论。我不想使用cast/struct或构建select语句等方法。我知道我的模式是兼容的,并且我想知道是否可以进行“零成本”模式交换。 - rav
在我看来,一个简单的select应该就可以了。由select触发的任何计算都不会增加额外的Spark阶段,因此在所有实际目的下,您可以安全地忽略它们。根据select之前的转换,可能需要在之前添加一个额外的cache - werner
@werner 如果架构可能非常复杂,并且更改包括字段描述的更改和一些安全的可空性/类型升级,那真的很“简单”吗? - rav
我相信这应该是可能的,但我不确定关于可空性。 - werner
@werner 謝謝您的貢獻,當然可能能夠生成這樣的select表達式,但我寧願避免編寫這樣的查詢生成器。支持任意模式(嵌套和元數據:可為空、說明和任意字段)可能需要相當多的工作。 - rav
2个回答

2

我自己也深入研究了一下这个问题,我想知道你对我的解决方法/POC的看法。请参阅https://github.com/ravwojdyla/spark-schema-utils。它可以转换表达式并更新属性。

假设我有两个模式,第一个没有任何元数据,我们称其为schema_wo_metadata:

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {},
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {},
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

第二个有额外元数据的,内部(ia)和外部(ob)都有,我们称之为schema_wi_metadata

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {
                "description": "this is ia desc"
              },
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {
        "description": "this is ob desc"
      },
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

现在假设我有一个具有schema_wo_metadata模式的数据集,并希望将模式与schema_wi_metadata交换:

from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType


# I assume these get generate/specified somewhere
schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...

# You need my extra package
spark = SparkSession.builder \
    .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \
    .getOrCreate()

# Dummy data with `schema_wo_metadata` schema:
df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
                                 Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
                           schema=schema_wo_metadata)

_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)

现在,new_df包含了schema_wi_metadata,例如:

new_df.schema["oa"].dataType.elementType["ia"].metadata
# -> {'description': 'this is ia desc'}

有什么意见吗?


1
此内容已发布于:https://issues.apache.org/jira/browse/SPARK-38904,请考虑给予+1,如果您也认为这很有用的话。 - rav
希望 https://github.com/apache/spark/pull/37011 能够解决 Spark 自身中的这个问题/疑问。 - rav

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接