Spark在标准输出中丢失println()

19

我有如下代码:

val blueCount = sc.accumulator[Long](0)
val output = input.map { data =>
  for (value <- data.getValues()) {
    if (record.getEnum() == DataEnum.BLUE) {
      blueCount += 1
      println("Enum = BLUE : " + value.toString()
    }
  }
  data
}.persist(StorageLevel.MEMORY_ONLY_SER)

output.saveAsTextFile("myOutput")

那么blueCount不为零,但是我没有得到println()输出!我有什么遗漏吗?谢谢!

2个回答

20

这是一个概念性问题...

假设您有一个由许多工作节点组成的大集群,假设有n个工作节点,这些工作节点存储了RDDDataFrame的分区,想象一下,您在该数据上启动了一个map任务,在该map中有一个print语句,首先:

  • 数据将在哪里打印出来?
  • 哪个节点具有优先级以及哪个分区?
  • 如果所有节点都并行运行,谁会先打印?
  • 如何创建此打印队列?

这些是太多的问题,因此apache-spark的设计者/维护者决定在任何map-reduce操作(包括accumulators和甚至broadcast变量)中逻辑上放弃对print语句的支持。

这也是有道理的,因为Spark是一种针对非常大型数据集设计的语言。尽管打印对于测试和调试可能很有用,但您不会想要打印DataFrame或RDD的每一行,因为它们被构建为具有数百万或数十亿行!那么为什么要处理这些复杂的问题,当您一开始就不想打印呢?

为了证明这一点,您可以运行此Scala代码:

// Let's create a simple RDD
val rdd = sc.parallelize(1 to 10000)

def printStuff(x:Int):Int = {
  println(x)
  x + 1
}

// It doesn't print anything! because of a logic design limitation!
rdd.map(printStuff)

// But you can print the RDD by doing the following:
rdd.take(10).foreach(println)

15
我相信println的工作非常好:它只是将内容输出到运行Spark执行器的计算机上的stdout / stderr。除非你有一种捕获那些日志的方法,否则你永远看不到它们。然而,如果你正在使用yarn,那么有一个命令可以将所有内容打印出来。 - David
1
虽然参数是有效的,但Spark不执行任何类型的静态分析来删除代码。输出只是不像@David所解释的那样进入驱动程序的STDOUT。 - user6022341

0

我通过创建一个实用函数来解决了这个问题:

object PrintUtiltity {
    def print(data:String) = {
      println(data)
    }
}

因为Spark认为它正在调用一个实用函数而不是调用打印函数。显然,Spark没有(也不可能实际上)检查其实用函数中的每一行代码。 - Edamame
5
你正在程序中实例化一个对象。如果你没有清楚地了解实例化的过程,我不会保证这种行为能够持续下去。任何对程序或PrintUtility对象调用方式的更改都可能导致行为出现不可预测的变化。如果你想收集日志信息,请使用标准方法来进行操作,不要发明你不理解的随意机制。你所提供的关于为什么它可以工作的解释是非常错误的——没有禁止你所做的事情;也没有代码检查器来确保你不会作弊:所有的行为都遵循系统设计。 - David

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接