在Pyspark中运行简单的应用程序。
f = sc.textFile("README.md")
wc = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
我想使用foreach操作查看RDD内容:
wc.foreach(print)
这会抛出一个语法错误:
SyntaxError: invalid syntax
我错过了什么?
在Pyspark中运行简单的应用程序。
f = sc.textFile("README.md")
wc = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
我想使用foreach操作查看RDD内容:
wc.foreach(print)
这会抛出一个语法错误:
SyntaxError: invalid syntax
我错过了什么?
在Spark 2.0中(我没有测试过早期版本),简单地说:
print myRDD.take(n)
其中n是行数,myRDD在您的情况下是wc。
这个错误是因为在Python 2.6中print
不是一个函数。
你可以定义一个辅助的UDF来执行打印操作,或者使用__future__库将print
视为一个函数:
>>> from operator import add
>>> f = sc.textFile("README.md")
>>> def g(x):
... print x
...
>>> wc.foreach(g)
或者>>> from __future__ import print_function
>>> wc.foreach(print)
然而,我认为最好使用collect()
将RDD内容传回驱动程序,因为foreach
在工作节点上执行,输出可能不会出现在您的驱动程序/ shell中(在local
模式下可能会出现,但在集群上运行时不会)。
>>> for x in wc.collect():
... print x
print(wc.collect())
试试这个:
data = f.flatMap(lambda x: x.split(' '))
map = data.map(lambda x: (x, 1))
mapreduce = map.reduceByKey(lambda x,y: x+y)
result = mapreduce.collect()
如果您想查看RDD的内容,那么collect是一个选项,但它会将所有数据提取到驱动程序中,可能会出现问题。
<rdd.name>.take(<num of elements you want to fetch>)
如果您只想查看示例,则最好。
运行 foreach 并尝试打印,我不建议这样做,因为如果您在集群上运行此操作,则打印日志将仅对执行器本地,并且它将打印对该执行器可访问的数据。 print 语句不会改变状态,因此逻辑上是正确的。要获取所有日志,您需要执行类似以下操作:
**Pseudocode**
collect
foreach print
但这可能会导致作业失败,因为收集驱动程序上的所有数据可能会使其崩溃。我建议使用take命令,或者如果您想要分析它,则使用在驱动程序上的sample采集,或将其写入文件,然后进行分析。
foreach()
,您可以使用以下代码:for row in f.take(f.count()): print(row)
。 - emmagras