在 PySpark 中使用包含特殊字符列名的 Parquet 文件

7
主要目标
展示或从读取自Parquet文件的Spark DataFrame中选择列。论坛中提到的所有解决方案在我们的情况下都没有成功。 问题
当使用SPARK读取并查询Parquet文件时,由于列名中存在特殊字符 ,;{}()\n\t=,会出现问题。通过一个包含两列和五行的简单parquet文件再现了该问题。列的名称为:
- SpeedReference_Final_01 (RifVel_G0) - SpeedReference_Final_02 (RifVel_G1)
所引发的错误是:
属性名称“SpeedReference_Final_01 (RifVel_G0)”包含“,;{}()\n\t=”之一的无效字符。请使用别名进行重命名。 我们正在使用Python语言中的PySpark,实验的解决方案可归类如下:
  1. 基于列重命名的解决方案 - [spark.read.parquet + 获取的DataFrame重命名]
    已经尝试了几种解决方案:

    • withColumnRenamed (脚本中的第二个问题)
    • toDF(问题N.3)
    • alias(问题N.5)

    但它们在我们的情况下都没有起作用。

  2. 将Parquet文件读取到Pandas DataFrame中,然后从中创建一个新的Spark DataFrame - [pd.read.parquet+spark.createDataFrame]
    这个解决方案在一个小的parquet文件中(问题N.0即脚本中的WORKAROUND)是有效的:即使它包含特殊字符的列名,创建出的Spark DataFrame也可以成功查询。但不幸的是,在我们的大型Parquet文件(每个Parquet文件有600000行x1000列),创建Spark DataFrame是无法实现的,因为这将需要很长时间。

  3. 尝试将Parquet文件读入Spark DataFrame并使用其rdd和重命名的模式创建新的Spark DataFrame是不可行的,因为从Spark DataFrame中提取rdd会引发相同的错误(问题N.4)。

  4. 使用前缀模式读取Parquet文件(避免了特殊字符)- [spark.read.schema(...).parquet]
    该解决方案不起作用,因为与关键列相关的数据变为null / None,因为已重命名的列不存在于原始文件中。

下面的Python代码总结了提到的解决方案,并且已经在示例 Parquet 文件上进行了实验。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

import pandas as pd

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Select file
filename = 'D:/Simple.parquet'

issue_num = 0 # Workaround to issues (Equivalent to no issue)
#issue_num = 1 # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
#issue_num = 2 # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
#issue_num = 3 # Issue 3 - Unable to show dataframe or select column after rename (using toDF)
#issue_num = 4 # Issue 4 - Unable to extract rdd from renamed dataframe 
#issue_num = 5 # Issue 5 - Unable to select column with alias

if issue_num == 0:

    ################################################################################################
    # WORKAROUND - Create Spark data frame from Pandas dataframe
    df_pd = pd.read_parquet(filename)
    DF = spark.createDataFrame(df_pd)
    print('WORKAROUND')
    DF.show()
    # +-----------------------------------+-----------------------------------+
    # |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
    # +-----------------------------------+-----------------------------------+
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # +-----------------------------------+-----------------------------------+

    ################################################################################################
    # Correct management of columns with  invalid characters when using spark.createDataFrame
    # spark.createDataFrame: Create a dataframe with two columns with  invalid characters - OK
    # DFCREATED
    schema = StructType(
        [
            StructField("SpeedReference_Final_01 (RifVel_G0)", FloatType(), nullable=True),
            StructField("SpeedReference_Final_02 (RifVel_G1)", FloatType(), nullable=True)
        ]
    )

    row_in = [(553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372)]

    rdd=spark.sparkContext.parallelize(row_in)
    DFCREATED = spark.createDataFrame(rdd, schema)
    DFCREATED.show()
    # +-----------------------------------+-----------------------------------+
    # |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
    # +-----------------------------------+-----------------------------------+
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # +-----------------------------------+-----------------------------------+
    DF_SEL_VAR_CREATED = DFCREATED.select(DFCREATED.columns[0]).take(2)
    for el in DF_SEL_VAR_CREATED:
        print(el)
    #Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
    #Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
    
else:
    # spark.read: read file into dataframe - OK
    DF = spark.read.parquet(filename)
    print('ORIGINAL SCHEMA')
    DF.printSchema()
    # root
    #  |-- SpeedReference_Final_01 (RifVel_G0): float (nullable = true)
    #  |-- SpeedReference_Final_02 (RifVel_G1): float (nullable = true)
    
    if issue_num == 1:
        ###############################################################################################    
        # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
        DF.show()
        # DF.select(DF.columns[0]).show()
        # DF_SEL_VAR = DF.select(DF.columns[0]).take(3)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 3 previous statements

    elif issue_num == 2:
        ###############################################################################################    
        # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
        DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
       
        print('RENAMED SCHEMA')
        DFRENAMED.printSchema()
        # root
        #  |-- RifVelG0: float (nullable = true)
        #  |-- RifVelG1: float (nullable = true)

        DFRENAMED.show()
        # DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 2 previous statements

    elif issue_num == 3:
        ###############################################################################################    
        # Issue 3 - Unable to show dataframe or select column after rename (using to_DF)
        DFRENAMED = DF.toDF('RifVelG0', 'RifVelG1')
    
        print('RENAMED SCHEMA')
        DFRENAMED.printSchema()
        # root
        #  |-- RifVelG0: float (nullable = true)
        #  |-- RifVelG1: float (nullable = true)

        DFRENAMED.show()
        # DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 2 previous statements

    elif issue_num == 4:
        ###############################################################################################    
        # Issue 4 - Unable to extract rdd from renamed dataframe 
        DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
        DFRENAMED_rdd = DFRENAMED.rdd
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

    elif issue_num == 5:
        ###############################################################################################    
        # Issue 5 - Unable to select column with alias
        DF_SEL_VAR = DF.select(col(DF.columns[0]).alias('RifVelG0')).take(3)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

你有没有想法来解决这个问题?

非常感谢您的任何建议。

1个回答

0
尝试像这样做:

import re
import pyspark.sql.functions as f

def remove_special_characters(string: str):
    return re.sub("[^a-zA-Z0-9 ]", "", string)

DFCREATED = DFCREATED.select(
    [
        f.col(column).alias(remove_special_characters(column))
        for column in DFCREATED.columns
    ]
)

我认为您也可以使用这个函数来删除其他的东西,比如空格。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接