目前,我刚开始学习Spark,并使用Python编写Spark代码。
我能够从parquet文件中读取数据并将其存储到DataFrame中,然后作为临时表。
但是它没有打印出执行查询的结果。请帮忙调试。
代码:
import os
os.environ['SPARK_HOME']="/opt/apps/spark-2.0.1-bin-hadoop2.7/"
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext(master='local')
sqlCtx = SQLContext(sc)
df_tract_alpha = sqlCtx.read.parquet("tract_alpha.parquet")
print (df_tract_alpha.columns)
sqlCtx.registerDataFrameAsTable(df_tract_alpha, "table1")
nt = sqlCtx.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500").collect()
n1 = nt[0].pageCount
print n1
这是结果:
Column< pageCount['pageCount'] > instead of printing the value
以下是堆栈跟踪:
17/06/12 12:54:27 警告 BlockManager:由于异常而无法放置块broadcast_2 17/06/12 12:54:27 警告 BlockManager:由于在磁盘或内存中未找到,因此无法删除块broadcast_2 Traceback(最近的通话): 在“/home/vn/scripts/g_s_pipe/test_code_here.py”中的第66行, nt = sqlContext.sql(“SELECT count(*)as pageCount FROM table1 WHERE pp_count> = 500”)。collect() 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py”中的第310行, port = self._jdf.collectToPython() 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py”中的第1133行, 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/pyspark/sql/utils.py”中的第63行, 在“/opt/apps/spark-2.0.1-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py”中的第319行, py4j.protocol.Py4JJavaError:调用o30.collectToPython时发生错误。 :java.lang.reflect.InaccessibleObjectException:无法使字段瞬态java.lang.Object[] java.util.ArrayList.elementData可访问:模块java.base不会“打开java.util”以对未命名的模块@ 55deb90进行操作 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:335) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:278) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:175) at java.base/java.lang.reflect.Field.setAccessible(Field.java:169) at org.apache.spark.util.SizeEstimator$$anonfun$getClassInfo$3.apply(SizeEstimator.scala:336) at org.apache.spark.util.SizeEstimator$$anonfun$getClassInfo$3.apply(SizeEstimator.scala:330) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.util.SizeEstimator $ .getClassInfo(SizeEstimator.scala:330) at org.apache.spark.util.SizeEstimator $ .visitSingleObject(SizeEstimator.scala:222) at org.apache.spark.util.SizeEstimator $ .org $ apache $ spark $ util $ SizeEstimator $ $ estimate(SizeEstimator.scala:201) at org.apache.spark.util.SizeEstimator $ .estimate(SizeEstimator.scala:69) at org.apache.spark.util.collection.SizeTracker $ class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker $ class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214) at org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:935) at org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:702) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1234) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:103) at org.apache.spark.broadcast.TorrentBroadcast。 (TorrentBroadcast.scala:86) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReader(ParquetFileFormat.scala:329) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:281)