将Pandas转换为Spark时出现的TypeError错误

3

我在这里查找了这个问题,但之前的解决方案对我没有用。我有一个这样格式的数据框:

mdf.head()
    dbn       boro       bus
0   17K548  Brooklyn    B41, B43, B44-SBS, B45, B48, B49, B69
1   09X543  Bronx       Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4   28Q680  Queens      Q25, Q46, Q65
6   14K474  Brooklyn    B24, B43, B48, B60, Q54, Q59

还有几列,但我已将它们排除在外(地铁线和测试分数)。 当我尝试将此DataFrame转换为Spark DataFrame时,会出现以下错误。

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
--> 425             rdd, schema = self._createFromLocal(data, schema)
    426         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    427         jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
    339 
    340         if schema is None or isinstance(schema, (list, tuple)):
--> 341             struct = self._inferSchemaFromList(data)
    342             if isinstance(schema, (list, tuple)):
    343                 for i, name in enumerate(schema):

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
    239             warnings.warn("inferring schema from dict is deprecated,"
    240                           "please use pyspark.sql.Row instead")
--> 241         schema = reduce(_merge_type, map(_infer_schema, data))
    242         if _has_nulltype(schema):
    243             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
    860         nfs = dict((f.name, f.dataType) for f in b.fields)
    861         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862                   for f in a.fields]
    863         names = set([f.name for f in fields])
    864         for n in nfs:

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
    854     elif type(a) is not type(b):
    855         # TODO: type cast (such as int -> long)
--> 856         raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
    857 
    858     # same type

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

根据我所了解的,这可能是由于标题被视为数据而导致的问题。据我所知,您无法从DataFrame中删除标题,因此我该如何解决此错误并将此DataFrame转换为Spark DataFrame?

编辑:以下是我创建Pandas DF并解决问题的代码。

sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")

这段代码的最后一行,从我的本地机器加载它最终使我能够将CSV正确转换为数据框,但我的问题仍然存在。为什么一开始它不能工作?
4个回答

3
我曾经碰到过同样的问题,最终发现其中一个条目的值为长度为0(或为空)。_inferScheme 命令对数据框中的每一行运行并确定其类型。默认情况下,假设空值为 Double 类型,而其他值则为 String 类型。这两种类型无法通过 _merge_type 命令合并。该问题已在 https://issues.apache.org/jira/browse/SPARK-18178 中报告,但解决方法可能是向 createDataFrame 命令提供模式。以下代码可在 PySpark 2.0 中重现此问题。
import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()

3
您可以使用反射从Row对象的RDD中推断模式,例如:
from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)

那是否达到了预期的结果?

我遇到了一个错误 AttributeError: 'DataFrame' object has no attribute 'map' - gold_cy
哦,mdf 是一个 pandas DataFrame 吗?我错误地认为它是一个 Spark RDD。你需要使用 pandas 吗?或者你可以创建一个 Spark RDD,然后像上面那样将其转换为 Spark DataFrame 吗? - user4601931
这是我面临的问题。如果我使用com.databricks.spark.csv将其作为RDD加载以读取CSV文件,则它会完全忽略dbn列并将所有内容向左移动一列。我不确定如何避免这个问题,所以我通过Pandas的read_csv加载了它,这样可以保留原始CSV的格式。 - gold_cy
你的意思是你尝试过 spark.read.csv("/path/to/file.csv", header=True),但是没有成功? - user4601931
我使用了spark-cdv包,因为我使用的是Spark 1.6.2(这是HomeBrew上可用的最新版本)。我应该更新到2.0吗?因为我知道他们将read.csv直接内联到程序中了。问题在于我假设CSV具有编码字符和/或尾随/前导空格。我将在帖子中更新如何创建pandas框架。我还可以通过使用适当的编码将其保存在本地计算机上来解决问题,但这可能不是Apache Spark的良好实践。 - gold_cy
显示剩余3条评论

2
您也可以尝试这样做:
def create_spark_dataframe(file_name):
   """
   will return the spark dataframe input pandas dataframe
   """
   pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str})
   for col in pandas_data_frame.columns:
   if ((pandas_data_frame[col].dtypes != np.int64) & 
      (pandas_data_frame[col].dtypes != np.float64)):
    pandas_data_frame[col] = pandas_data_frame[col].fillna('')

   spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
   return spark_data_frame

这将解决你的问题。

0
这里的问题是pandas默认使用np.nan(不是数字)作为空字符串的值,这在转换为spark.df时会导致模式混淆。
基本方法是将np.nan转换为None,这样就可以正常工作了。
不幸的是,pandas不允许您使用None来填充fillna。由于np.nan不遵循自等条件,因此您可以使用巧妙的技巧来解决这个问题。
new_series = new_series.apply(lambda x: None if x != x else x)

然后, display(sqlContext.createDataFrame(new_df_1)) 就可以正常工作了。

如果有人能够建议我将nan转换为None的直接方法,我会很高兴。 - Itachi
1
我用0来填充NaN,但仍未解决错误。 - LePuppy
是的,我是。但这不是你的代码所做的吗?我迷失了。 - LePuppy
在我的代码中,它是一个Pandas Series,您需要使用new_df_1['column_name'].apply(func)将您的Dataframe转换为Series级别。 - Itachi
然而,这并没有解决我的问题。也许我们在不知情的情况下谈论着两件不同的事情。 - LePuppy
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接