如何打印RDD的内容?

137

我试图将一个集合的内容打印到Spark控制台。

我有一个类型:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]

然后我使用如下命令:

scala> linesWithSessionId.map(line => println(line))

但是这个被打印出来了:

res1: org.apache.spark.rdd.RDD[Unit] = MappedRDD[4] at map at :19

我该如何将RDD写入控制台或保存到磁盘,以便查看其内容?


1
嗨!你有没有看到你接受的答案下面的评论?它似乎是误导性的。 - dk14
2
@dk14同意,我已重新分配了被接受的答案。 - blue-sky
RDD正在被降为二等公民,你应该使用DataFrame和“show”方法。 - Thomas Decaux
10个回答

282

如果您想查看RDD的内容,一种方法是使用collect()

myRDD.collect().foreach(println)

然而,当RDD具有数十亿行时,使用 take() 仅取一部分进行打印不是个好主意:

myRDD.take(n).foreach(println)

1
如果我在拥有数百万行的RDD上使用foreach将内容写入HDFS作为单个文件,那么在集群上是否会出现任何问题? - Shankar
1
我没有在RDD上使用saveAsTextFile的原因是,我需要将RDD内容写入多个文件,所以我使用了foreach - Shankar
如果您想要保存在单个文件中,可以在调用saveAsTextFile之前将RDD合并为一个分区,但这可能会导致问题。我认为最好的选择是在HDFS中写入多个文件,然后使用hdfs dfs --getmerge命令来合并文件。 - Oussama
你说当在RDD上使用foreach时,它会被持久化到驱动程序的RAM中,这个说法正确吗?因为我的理解是foreach会在每个工作节点[集群]上运行,而不是在驱动程序上。 - Shankar
saveAsTextFile会将每个分区写为一个文件,这正是您想要的(多个文件)。否则,如Oussama所建议的那样,可以执行rdd.coalesce(1).saveAsTextFile()以获取一个文件。如果RDD的分区太少,不符合您的喜好,可以尝试rdd.repartition(N).saveAsTextFile()。 - foghorn
显示剩余4条评论

50
< p > map 函数是一种 转换,这意味着 Spark 不会在您对 RDD 进行 操作 之前实际评估它。

要打印它,可以使用 foreach(这是一种操作):

linesWithSessionId.foreach(println)

你可以使用 RDD API 中的一个saveAs...函数(仍然是操作)将数据写入磁盘。


7
或许您需要提及“collect”以便将RDD打印到控制台上。 - zsxwing
1
foreach 本身将首先“实现” RDD,然后在每个元素上运行 println,因此这里并不真正需要使用 collect(当然您也可以使用它)... - fedragon
5
实际上,在使用foreach之前,如果没有使用collect(),我将无法在控制台上看到任何内容。 - Vittorio Cozzolino
1
在Spark 1.2.0上,这不会打印RDD。@Oussama的答案确实有效。 - Matthew Cornell
3
实际上,在我的Spark shell中,即使是在1.2.0版本中,它也完全可以正常工作。但我认为我知道这种混淆的原因:原始问题询问如何将RDD打印到Spark控制台(= shell),因此我假定他会运行本地作业,在这种情况下,“foreach”就可以正常工作。如果您正在集群上运行作业,并且希望打印rdd,则应该“collect”(正如其他评论和答案所指出的那样),以便在执行“println”之前将其发送到驱动程序。如果你的RDD太大,使用Oussama建议的“take”可能是一个好主意。 - fedragon
8
上面的回答不好。你应该取消接受它。foreach 不会在控制台上打印,它会在你的工作节点上打印。如果你只有一个节点,那么 foreach 会起作用。但是如果你只有一个节点,那么为什么要使用 Spark?只需使用 SQL awk、Grep 或其他更简单的东西即可。所以我认为唯一有效的答案是 collect。如果 collect 对你来说太大了,而你只需要样本,请使用 take 或 head 或类似的函数,如下所述。 - eshalev

16
你可以将你的RDD转换为一个DataFrame,然后使用show()来展示它。
// For implicit conversion from RDD to DataFrame
import spark.implicits._

fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)])

// convert to DF then show it
fruits.toDF().show()

这将显示您的数据的前20行,因此您的数据大小不应成为问题。

+------+---+                                                                    
|    _1| _2|
+------+---+
| apple|  1|
|banana|  2|
|orange| 17|
+------+---+

1
我认为它是 import spark.implicits._ - Ryan Hartman
这里使用了哪个库?我在Spark范围内既无法检测到toDF,也无法检测到spark.implicits._ - Sergii

13

如果您正在集群上运行此代码,则 println 不会将输出打印回您的环境。您需要将 RDD 数据传递到会话中才能进行打印。要实现这一点,您可以强制将其转换为本地数组然后进行打印:

linesWithSessionId.toArray().foreach(line => println(line))

2
c.take(10)

较新版本的Spark可以很好地显示表格。

1
在 Python 中
   linesWithSessionIdCollect = linesWithSessionId.collect()
   linesWithSessionIdCollect

这将打印出RDD的所有内容


1
谢谢,但是我用Scala而不是Python标记了这个问题。 - blue-sky

1

myRDD.foreach(println)myRDD.collect().foreach(println)(不仅是'collect',还有其他操作)之间可能存在许多架构差异。我看到的一个差异是,在执行myRDD.foreach(println)时,输出将以随机顺序呈现。例如:如果我的rdd来自每行都有数字的文本文件,则输出将具有不同的顺序。但是当我执行myRDD.collect().foreach(println)时,顺序保持与文本文件一样。


1

不必每次都手动输入,可以:

[1] 在Spark Shell中创建一个通用的打印方法。

def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)

[2] 或者更好的方法是,使用隐式转换,您可以将函数添加到RDD类中以打印其内容。

implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) {
    def print = rdd.foreach(println)
}

使用示例:

val rdd = sc.parallelize(List(1,2,3,4)).map(_*2)

p(rdd) // 1
rdd.print // 2

输出:

2
6
4
8

重要提示

只有在本地模式下且使用小型数据集时才有意义。否则,您将无法在客户端看到结果或因大型数据集结果而耗尽内存。


0

用Java语法:

rdd.collect().forEach(line -> System.out.println(line));

0

您还可以将其保存为文件:rdd.saveAsTextFile("alicia.txt")


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接