如何使用pyspark从spark sql中获取结果?

3

目前,我刚开始学习Spark,并使用Python编写Spark代码。

我能够从parquet文件中读取数据并将其存储到DataFrame中,然后作为临时表。

但是它没有打印出执行查询的结果。请帮忙调试。

代码:

import os
os.environ['SPARK_HOME']="/opt/apps/spark-2.0.1-bin-hadoop2.7/"
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext(master='local')
sqlCtx = SQLContext(sc)
df_tract_alpha = sqlCtx.read.parquet("tract_alpha.parquet")
print (df_tract_alpha.columns)
sqlCtx.registerDataFrameAsTable(df_tract_alpha, "table1")
nt = sqlCtx.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500").collect()
n1 = nt[0].pageCount
print n1

这是结果:

 Column< pageCount['pageCount'] > instead of printing the value

以下是堆栈跟踪:

17/06/12 12:54:27 警告 BlockManager:由于异常而无法放置块broadcast_2 17/06/12 12:54:27 警告 BlockManager:由于在磁盘或内存中未找到,因此无法删除块broadcast_2 Traceback(最近的通话): 在“/home/vn/scripts/g_s_pipe/test_code_here.py”中的第66行, nt = sqlContext.sql(“SELECT count(*)as pageCount FROM table1 WHERE pp_count> = 500”)。collect() 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py”中的第310行, port = self._jdf.collectToPython() 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py”中的第1133行, 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/pyspark/sql/utils.py”中的第63行, 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py”中的第319行, py4j.protocol.Py4JJavaError:调用o30.collectToPython时发生错误。 :java.lang.reflect.InaccessibleObjectException:无法使字段瞬态java.lang.Object[] java.util.ArrayList.elementData可访问:模块java.base不会“打开java.util”以对未命名的模块@ 55deb90进行操作 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:335) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:278) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:175) at java.base/java.lang.reflect.Field.setAccessible(Field.java:169) at org.apache.spark.util.SizeEstimator$$anonfun$getClassInfo$3.apply(SizeEstimator.scala:336) at org.apache.spark.util.SizeEstimator$$anonfun$getClassInfo$3.apply(SizeEstimator.scala:330) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.util.SizeEstimator $ .getClassInfo(SizeEstimator.scala:330) at org.apache.spark.util.SizeEstimator $ .visitSingleObject(SizeEstimator.scala:222) at org.apache.spark.util.SizeEstimator $ .org $ apache $ spark $ util $ SizeEstimator $ $ estimate(SizeEstimator.scala:201) at org.apache.spark.util.SizeEstimator $ .estimate(SizeEstimator.scala:69) at org.apache.spark.util.collection.SizeTracker $ class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker $ class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214) at org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:935) at org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:702) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1234) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:103) at org.apache.spark.broadcast.TorrentBroadcast。 (TorrentBroadcast.scala:86) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReader(ParquetFileFormat.scala:329) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:281)

这个错误是在 n1 = ntrac.show 上触发的,而在你分享的代码中并没有这一行... - undefined
抱歉,我贴错了代码片段,请看正确的代码...我在尝试一些问题中贴出的选项之外的其他选择。 - undefined
请现在检查更新的跟踪记录。 - undefined
我已经更新了我的答案,再清楚不过了。你可以看到我正在使用你的实际代码,并且当然我已经纠正了collect方法的错误。 - undefined
如果你运行我给你的代码,它会出现相同的错误吗?这是不可能的。 - undefined
显示剩余3条评论
1个回答

4

collect函数使用括号()

nt = sqlCtx.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500") \
           .collect()

示例:

首先让我们检查一下我们的Parquet数据:

$> parquet-tools head data.parquet/
a = 1
pp_count = 500

a = 2
pp_count = 750

a = 3
pp_count = 400

a = 4
pp_count = 600

a = 5
pp_count = 700

我们将运行以下代码:
sc = SparkContext(master='local')
sqlContext = SQLContext(sc)

df = sqlContext.read.parquet("data.parquet")
print("data columns : {} ".format(df.columns))

sqlContext.registerDataFrameAsTable(df, "table1")
results = sqlContext.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500").collect()
df.show()
print("initial data count : {}".format(df.count()))
page_count = results[0].pageCount
print("page count : {}".format(page_count))

提交申请后,输出如下:
data columns : ['a', 'pp_count']
+---+--------+
|  a|pp_count|
+---+--------+
|  1|     500|
|  2|     750|
|  3|     400|
|  4|     600|
|  5|     700|
+---+--------+

initial data count : 5
page count : 4

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接