将Pandas数据框转换为Spark数据框时出现错误

72

我正在尝试将Pandas的DF转换为Spark的DF。 DF的头部:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691

代码:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)

我遇到了一个错误:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

2
我的第一个假设是文件包含了数字和字符串在同一列中,Spark 对此感到困惑。然而,在导入时 Pandas 应该可以处理它。 - Ivan Sudos
你的数据框是否有列名? - MaxU - stand with Ukraine
不需要,但如果您将其放入DF头输出中会更有帮助。尝试跳过第11列(带有NA)并重新运行您的代码。 - MaxU - stand with Ukraine
1
为什么你不使用 spark-csv - Alberto Bonsanto
我曾经看到过这个有趣的处理方式 df = df.replace({np.nan: None}),在 AWS Glue 的上下文中(Pandas DF -> Spark DF -> Glue Dynamic DF)。问题在于 NA 值会让 Spark 混淆(它们是“float”),而该列的其余部分是整数或字符串。 - Tomasz Gandor
显示剩余5条评论
7个回答

99

我写了这个脚本,在我的10个pandas数据框中它起作用了。

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

您也可以在此代码片段中查看。

使用此方法,您只需调用spark_df = pandas_to_spark(pandas_df)即可。


6
已验证这一切工作正常,还验证了从pyspark输出到parquet再进入scala的输出。谢谢Gonzalo。虽然我不知道如何实现,但这似乎是对开源社区的杰出贡献。也许可以起名为pd.to_sparkdf()之类的。 - Tony Fraser
2
Gonzalo,我刚刚fork了你的gist来支持ArrayType[StringType]。再次感谢。读者们,这是从pandas到pyspark和scala spark的绝佳解决方案。 - Tony Fraser
2
警告:如果您尝试转换由日期和时间组成的datetime对象(例如pd.to_datetime('2020-01-01 13:45:12')),则使用您的方法会丢失时间信息。为了解决这个问题,请将DateType()更改为TimestampType() - tschmelz
感谢您提供的优美解决方案。您的函数pandas_to_spark()在我的使用情况下运行无误,我可以显示结果。但是,我不能保存生成的数据框,也不能执行简单的过滤语句。请问您能否帮助我找出原因?https://stackoverflow.com/questions/69974539/sparkerror-pickleexception-expected-zero-arguments-for-construction-of-classdi - Rens
1
请求更改 elif f == 'float64': return FloatType()elif f == 'float64': return DoubleType(),然后在该行之后添加 elif f == 'float32': return FloatType() - Loqz
显示剩余3条评论

49

通过以下方式强制使用架构,可以避免与类型相关的错误:

注意: 创建了一个文本文件 (test.csv),其中包含原始数据(如上所示)和虚构的列名("col1", "col2",...,"col25")。

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")

pandas数据帧的内容:

       col1     col2    col3    col4    col5    col6    col7    col8   ... 
0      10000001 1       0       1       12:35   OK      10002   1      ...
1      10000001 2       0       1       12:36   OK      10002   1      ...
2      10000002 1       0       4       12:19   PA      10003   1      ...

接下来,创建模式:

from pyspark.sql.types import *

mySchema = StructType([ StructField("col1", LongType(), True)\
                       ,StructField("col2", IntegerType(), True)\
                       ,StructField("col3", IntegerType(), True)\
                       ,StructField("col4", IntegerType(), True)\
                       ,StructField("col5", StringType(), True)\
                       ,StructField("col6", StringType(), True)\
                       ,StructField("col7", IntegerType(), True)\
                       ,StructField("col8", IntegerType(), True)\
                       ,StructField("col9", IntegerType(), True)\
                       ,StructField("col10", IntegerType(), True)\
                       ,StructField("col11", StringType(), True)\
                       ,StructField("col12", StringType(), True)\
                       ,StructField("col13", IntegerType(), True)\
                       ,StructField("col14", IntegerType(), True)\
                       ,StructField("col15", IntegerType(), True)\
                       ,StructField("col16", IntegerType(), True)\
                       ,StructField("col17", IntegerType(), True)\
                       ,StructField("col18", IntegerType(), True)\
                       ,StructField("col19", IntegerType(), True)\
                       ,StructField("col20", IntegerType(), True)\
                       ,StructField("col21", IntegerType(), True)\
                       ,StructField("col22", IntegerType(), True)\
                       ,StructField("col23", IntegerType(), True)\
                       ,StructField("col24", IntegerType(), True)\
                       ,StructField("col25", IntegerType(), True)])

注意: True(意味着允许为空)

创建Pyspark数据框架:

df = spark.createDataFrame(pdDF,schema=mySchema)

确认Pandas数据帧现在已经转换为PySpark数据帧:

确认Pandas数据帧现在已经转换为PySpark数据帧:

type(df)

输出:

pyspark.sql.dataframe.DataFrame

顺便提一句:

回应Kate在下面的评论-要强制实施一般的(字符串)模式,可以执行以下操作:

df=spark.createDataFrame(pdDF.astype(str)) 

3
可以将模式创建部分概括为仅创建特定类型的所有列吗?例如,只需告诉它将所有列都设置为字符串类型(而不是逐个分配每个列)。 - Kate
1
df=spark.createDataFrame(pdPD.astype(str)) - Grant Shannon
嗨Grant,在创建'mySchema'的步骤中,您是否必须键入所有内容?有没有办法从pandas数据帧的示例中提取模式?谢谢。 - mel el
是的 - 必须全部打出来(复制并粘贴并进行必要的更改)。我发现尝试让Spark数据框从Pandas数据框中推断模式(如上面的原始问题)太冒险了。我的看法是强制/施加正确的模式是最低风险的策略。如果您无法最初强制执行所需的模式,则快速且简单的方法是在所有内容上强制执行字符串模式(如上所示),然后在稍后的阶段更正类型。 - Grant Shannon
不必手动输入整个模式,您可以运行spark.createDataFrame(pandas_data_frame).schema来打印推断出的模式,并根据需要进行调整。 - rgrosskopf

49

您需要确保Pandas数据帧的列适合Spark正在推断的类型。如果您的Pandas数据帧列中列举了以下类似内容:

pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

如果你遇到了那个错误,请尝试:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

现在,请确保.astype(str)实际上是你想要的列的类型。基本上,当底层Java代码尝试从Python中的对象推断类型时,它使用一些观察结果并猜测,如果该猜测不适用于要从pandas转换为spark的全部数据列,则会失败。


1
我发现这非常有帮助。跟进问题:当我按照这些步骤对自己的数据框进行操作时,我没有看到pd.info()有任何变化。数据框本身如何改变?在使用.astype(str)后,我如何检查pandas DataFrame是否已更改? - EntryLevelR

17

在Spark版本>= 3中,您可以使用一行代码将Pandas数据帧转换为PySpark数据帧

使用spark.createDataFrame(pandasDF)

dataset = pd.read_csv("data/AS/test_v2.csv")

sparkDf = spark.createDataFrame(dataset);

如果您对Spark会话变量感到困惑,Spark会话如下:

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

1
谢谢!我花了很多时间构建一个将pandas转换为spark的转换器,甚至为此创建了一个github仓库。这确实使得转换变得容易,至少对于简单的数据类型来说是如此。 - Tony Fraser
1
当无法推断列类型时,这可能会引发错误。因此,实际上使用Gonzalo Garcia的答案是安全的。 - Stan

11

我已经使用你的数据尝试过了,它可以正常工作:

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()

对于我的数据,它需要很长时间。 - luminousmen
请参见“[答案]”和完全基于代码的解释。虽然这可能在技术上是正确的,但它并没有解释为什么它可以解决问题或应该被选为答案。我们应该在帮助解决问题的同时进行教育。 - the Tin Man

2

我稍微精简了最佳答案:

import pyspark.sql.types as ps_types


def get_equivalent_spark_type(pandas_type):
    """
        This method will retrieve the corresponding spark type given a pandas
        type.

        Args:
            pandas_type (str): pandas data type

        Returns:
            spark data type
    """
    type_map = {
        'datetime64[ns]': ps_types.TimestampType(),
        'int64': ps_types.LongType(),
        'int32': ps_types.IntegerType(),
        'float64': ps_types.DoubleType(),
        'float32': ps_types.FloatType()}
    if pandas_type not in type_map:
        return ps_types.StringType()
    else:
        return type_map[pandas_type]


def pandas_to_spark(spark, pandas_df):
    """
        This method will return a spark dataframe given a pandas dataframe.

        Args:
            spark (pyspark.sql.session.SparkSession): pyspark session
            pandas_df (pandas.core.frame.DataFrame): pandas DataFrame

        Returns:
            equivalent spark DataFrame
    """
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    p_schema = ps_types.StructType([
        ps_types.StructField(column, get_equivalent_spark_type(pandas_type))
        for column, pandas_type in zip(columns, types)])

    return spark.createDataFrame(pandas_df, p_schema)

0

我曾经收到过类似的错误信息,在我的情况下是因为我的pandas数据框包含了NULL值。我建议在转换为spark之前尝试在pandas中处理这个问题(这在我的情况下解决了问题)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接