数据类型转换:将Pandas中的DataFrame转换成Foundry中的PySpark格式

4
对于那些正在使用Foundry环境的人,我正在尝试在“代码库”中构建一个流程,以将来自Excel文件的原始数据集处理成干净的数据集,然后在“Contour”中进行分析。为此我使用了Python,但是这个流程似乎在使用Pyspark,并且在某个时候我必须将我用pandas清理过的数据集转换为pyspark格式,这就是我陷入困境的地方。
我查看了Stack Overflow上的几篇文章,试图将Pandas DF转换为Pyspark DF,但目前没有一种解决方案适用。当我尝试运行转换时,总是有一种数据类型无法转换,即使我强制指定了模式也无济于事。
Python代码部分已经在Spyder中成功测试(导入和导出Excel文件),并产生了预期结果。只有当我需要转换为Pyspark格式时才会出现问题。
@transform_pandas(
    Output("/MDM_OUT_OF_SERVICE_EVENTS_CLEAN"),
    OOS_raw=Input("/MDM_OUT_OF_SERVICE_EVENTS"),
)
def DA_transform(OOS_raw):

''' Code Section in Python '''

  mySchema=StructType([StructField(OOS_dup.columns[0], IntegerType(), 
                   True),
                   StructField(OOS_dup.columns[1], StringType(), True),
                   ...])

  OOS_out=sqlContext.createDataFrame(OOS_dup,schema 
    =mySchema,verifySchema=False)

return OOS_out

我在某个时候看到了这个错误信息:

AttributeError: 'unicode' object has no attribute 'toordinal'.

根据这篇文章,导致'unicode'对象没有属性'toordinal'在pyspark中的问题是因为pyspark无法将数据转换为日期类型。
但是在pandas中,数据以Datetime64 [ns]形式存在。我已经尝试将该列转换为字符串和整数,但也失败了。
这是Python输出数据集的图片:enter image description here 这是一旦数据集被清理后,由pandas返回的数据类型:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4972 entries, 0 to 4971
Data columns (total 51 columns):
OOS_ID                       4972 non-null int64
OPERATOR_CODE                4972 non-null object
ATA_CAUSE                    4972 non-null int64
EVENT_CODE                   3122 non-null object
AC_MODEL                     4972 non-null object
AC_SN                        4972 non-null int64
OOS_DATE                     4972 non-null datetime64[ns]
AIRPORT_CODE                 4915 non-null object
RTS_DATE                     4972 non-null datetime64[ns]
EVENT_TYPE                   4972 non-null object
CORRECTIVE_ACTION            417 non-null object
DD_HOURS_OOS                 4972 non-null float64
EVENT_DESCRIPTION            4972 non-null object
EVENT_CATEGORY               4972 non-null object
ATA_REPORTED                 324 non-null float64
TOTAL_CAUSES                 4875 non-null float64
EVENT_NUMBER                 3117 non-null float64
RTS_TIME                     4972 non-null object
OOS_TIME                     4972 non-null object
PREV_REPORTED                4972 non-null object
FERRY_IND                    4972 non-null object
REPAIR_STN_CODE              355 non-null object
MAINT_DOWN_TIME              4972 non-null float64
LOGBOOK_RECORD_IDENTIFIER    343 non-null object
RTS_IND                      4972 non-null object
READY_FOR_USE                924 non-null object
DQ_COMMENTS                  2 non-null object
REVIEWED                     5 non-null object
DOES_NOT_MEET_SPECS          4 non-null object
CORRECTED                    12 non-null object
EDITED_BY                    4972 non-null object
EDIT_DATE                    4972 non-null datetime64[ns]
OUTSTATION_INDICATOR         3801 non-null object
COMMENT_TEXT                 11 non-null object
ATA_CAUSE_CHAPTER            4972 non-null int64
ATA_CAUSE_SECTION            4972 non-null int64
ATA_CAUSE_COMPONENT          770 non-null float64
PROCESSOR_COMMENTS           83 non-null object
PARTS_AVAIL_AT_STATION       4972 non-null object
PARTS_SHIPPED_AT_STATION     4972 non-null object
ENGINEER_AT_STATION          4972 non-null object
ENGINEER_SENT_AT_STATION     4972 non-null object
SOURCE_FILE                  4972 non-null object
OOS_Month                    4972 non-null float64
OOS_Hour                     4972 non-null float64
OOS_Min                      4972 non-null float64
RTS_Month                    4972 non-null float64
RTS_Hour                     4972 non-null float64
RTS_Min                      4972 non-null float64
OOS_Timestamp                4972 non-null datetime64[ns]
RTS_Timestamp                4972 non-null datetime64[ns]
dtypes: datetime64[ns](5), float64(12), int64(5), object(29)

你能展示Pandas数据框吗? - vikrant rana
嗨,我已经在主贴中添加了数据集的图片。谢谢。 - Yoan B. M.Sc
谢谢。您能展示一下pandas dataframe的现有模式吗? - vikrant rana
1个回答

0

如果有助于您的话,我在官方Foundry文档中找到了如何正确地在pandas和pyspark DF之间进行转换的信息。

OOS_dup是我想要转换回Spark的Pandas数据帧。

# Extract the name of each columns with its data type in pandas
    col = OOS_dup.columns
    col_type = list()

    for c in col:
        t = OOS_dup[c].dtype.name
        col_type.append(t)

    df_schema = pd.DataFrame({"field": col, "data_type": col_type})

    # Define a function to replace missing (NaN sky coverage cells with Null
    def replace_missing(df, col_names):
        for col in col_names:
            df = df.withColumn("{}".format(col),
                               F.when(df["{}".format(col)] == "NaN", None).otherwise(df["{}".format(col)]))
        return df

    # Replace missing values
    OOS_dup = replace_missing(OOS_dup, col)

    # Define a function to change column types to the proper type in spark
    def change_type(df, col_names, dtypes):
        for col in col_names:
            df = df.withColumn("{}".format(col), F.when(dtypes == "float64", (df["{}".format(col)]).cast("double")).when(dtypes == "int64", (df["{}".format(col)]).cast("int")).when(dtypes == "datetime64[ns]", (df["{}".format(col)]).cast("date")).otherwise((df["{}".format(col)]).cast("string")))
        return df

    # Cast each columns to the proper data type
    OOS_dup = change_type(OOS_dup, df_schema["field"], df_schema["data_type"])

    OOS_dup = sqlContext.createDataFrame(OOS_dup)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接