pyspark错误:AttributeError: 'SparkSession'对象没有属性'parallelize'

24

我正在Jupyter笔记本上使用pyspark。以下是Spark的设置方式:

import findspark
findspark.init(spark_home='/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive', python_path='python2.7')

    import pyspark
    from pyspark.sql import *

    sc = pyspark.sql.SparkSession.builder.master("yarn-client").config("spark.executor.memory", "2g").config('spark.driver.memory', '1g').config('spark.driver.cores', '4').enableHiveSupport().getOrCreate()

    sqlContext = SQLContext(sc)

然后当我执行:

spark_df = sqlContext.createDataFrame(df_in)

其中df_in是一个Pandas数据框。然后我得到了以下错误:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-9-1db231ce21c9> in <module>()
----> 1 spark_df = sqlContext.createDataFrame(df_in)


/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    297         Py4JJavaError: ...
    298         """
--> 299         return self.sparkSession.createDataFrame(data, schema, samplingRatio)
    300 
    301     @since(1.3)

/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio)
    520             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    521         else:
--> 522             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    523         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    524         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema)
    400         # convert python objects to sql data
    401         data = [schema.toInternal(row) for row in data]
--> 402         return self._sc.parallelize(data), schema
    403 
    404     @since(2.0)

AttributeError: 'SparkSession' object has no attribute 'parallelize'

有人知道我做错了什么吗?谢谢!

2个回答

37

SparkSession 不是 SparkContext 的替代品,而是 SQLContext 的等价物。只需像使用 SQLContext 一样使用即可:

spark.createDataFrame(...)

如果您需要访问SparkContext,请使用sparkContext属性:

spark.sparkContext

所以,如果您需要向后兼容的SQLContext,您可以:

SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

1
我尝试了:spark_df = sc.createDataFrame(df_in),但是spark_df似乎已经损坏了。在这里,spark_df = sc.createDataFrame(df_in)是正确的转换方式吗? - Edamame
只有当df_increateDataFrame的有效参数时。 - zero323
df_in 是一个 pandas 数据框。我以为它应该是有效的? - Edamame

9
无论何时我们试图从一个向后兼容的对象(如RDD或由Spark Session创建的数据框)创建数据框,您需要让您的SQL上下文意识到您的会话和上下文。

例如:

如果我创建一个RDD:

ss=SparkSession.builder.appName("vivek").master('local').config("k1","vi").getOrCreate()

rdd=ss.sparkContext.parallelize([('Alex',21),('Bob',44)])

但是如果我们想要从这个RDD创建一个DataFrame,我们需要:

sq = SQLContext(sparkContext=ss.sparkContext, sparkSession=ss)

只有这样,我们才能使用SQLContext和由pandas创建的RDD / DF。

schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True)])
df=sq.createDataFrame(rdd,schema)
df.collect()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接