Scala 中的 Apache Spark 日志记录

71

我正在寻找一种解决方案,在执行Apache Spark节点上的代码时能够记录附加数据,以便在执行过程中出现问题时进行调查。尝试使用传统解决方案(例如com.typesafe.scalalogging.LazyLogging)会失败,因为像Apache Spark这样的分布式环境无法序列化日志实例。

我已经研究了这个问题,目前我找到的解决方案是使用org.apache.spark.Logging trait,如下所示:

class SparkExample with Logging {
  val someRDD = ...
  someRDD.map {
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  }
}

然而,看起来Logging特征对于Apache Spark并非长久之计,因为它被标记为@DeveloperApi并且类文档中提到:

在未来版本中可能会更改或删除此功能。

我想知道是否有任何已知的日志记录解决方案,可以让我在Apache Spark节点上执行RDD时记录数据?

@后续编辑: 下面的一些评论建议使用Log4J。 我尝试使用Log4J,但是当我从Scala类(而不是Scala对象)使用记录器时仍然遇到问题。 这是我的完整代码:

import org.apache.log4j.Logger
import org.apache.spark._

object Main {
 def main(args: Array[String]) {
  new LoggingTestWithRDD().doTest()
 }
}

class LoggingTestWithRDD extends Serializable {

  val log = Logger.getLogger(getClass.getName)

  def doTest(): Unit = {
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
     element =>
       log.info(s"$element will be processed")
       element + 1
    }
   spark.stop()
 }

我看到的异常是:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger


1
好的...配置您的log4j并完成日志记录。 - sarveshseri
1
基本上Apache Spark强制您仅使用log4j吗? - Bogdan N
2
顺便提一下,log4j 的正确拼写是 "slf4j"。 - michael
4
@michael_n,那不正确。log4j和slf4j是两个不同的东西。 - ben_frankly
如果你想要确保某些东西不会改变,并且认为这值得付出努力,那就自己写吧。可能可以使用Akka。但我认为这并不值得 - 如果Spark强制更改代码,只需更改即可。 - BAR
显示剩余6条评论
7个回答

54

您可以使用Akhil在https://www.mail-archive.com/user@spark.apache.org/msg29010.html提出的解决方案。我已经自己使用过了,它很有效。

Akhil Das Mon, 25 May 2015 08:20:40 -0700
可以尝试这种方式:

object Holder extends Serializable {      
   @transient lazy val log = Logger.getLogger(getClass.getName)    
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}

3
我曾使用Spark Streaming自定义接收器时遇到了空指针异常的问题,花了一整天时间苦思冥想。最终找到了解决方案。感谢您的帮助。 - Rodrigo Del C. Andrade
这似乎是一个解决方法。比如说我想要启用特定模块的日志记录,该怎么做? - Knight71
我在输出日志中看不到任何日志。你能建议我哪里可能出了问题吗? - Shilpa
我在spark-jobserver中仍然无法使用这种确切的方法。有人尝试过这种配置吗?一切看起来都应该可以工作,但是我在闭包内没有看到任何日志记录。 - Justin Standard
你能告诉我getLogger方法从哪里导入的吗? - j pavan kumar

12

使用Log4j 2.x。核心日志记录器已经被序列化,问题解决。

Jira讨论:https://issues.apache.org/jira/browse/LOG4J2-801

"org.apache.logging.log4j" % "log4j-api" % "2.x.x"

"org.apache.logging.log4j" % "log4j-core" % "2.x.x"

"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"

4
你能否提供完整的日志记录实现方法,例如如何创建 log4j2.properties 文件以及在代码中的具体实现。 - jAi

4

如果您需要在mapfilter或其他RDD函数之前和之后执行一些代码,请尝试使用mapPartition,其中底层迭代器是显式传递的。

示例:

val log = ??? // this gets captured and produces serialization error
rdd.map { x =>
  log.info(x)
  x+1
}

Becomes:

rdd.mapPartition { it =>
  val log = ??? // this is freshly initialized in worker nodes
  it.map { x =>
    log.info(x)
    x + 1
  }
}

每个基本的RDD函数都是用mapPartition实现的。

确保明确处理分区器,不要丢失它:查看Scaladoc,preservesPartitioning参数,这对性能至关重要。


2

这是一篇旧文章,但我想提供我的工作解决方案,我刚刚在努力奋斗后得到了这个解决方案,对其他人仍然有用:

我想在rdd.map函数内部打印rdd内容,但是会出现“Task Not Serializable Error”错误。这是使用扩展java.io.Serializable的scala静态对象解决此问题的方法:

import org.apache.log4j.Level

object MyClass extends Serializable{

val log = org.apache.log4j.LogManager.getLogger("name of my spark log")

log.setLevel(Level.INFO)

def main(args:Array[String])
{

rdd.map(t=>

//Using object's logger here

val log =MyClass.log

log.INFO("count"+rdd.count)
)
}

}

2
使记录器成为暂态和惰性的方法是解决问题的关键。 @transient lazy val log = Logger.getLogger(getClass.getName) @transient 告诉Spark不要将其序列化到所有执行器上,而lazy会在第一次使用时创建实例。换句话说,每个执行器都有自己的记录器实例。即使可以序列化记录器,这也不是一个好主意。
当然,您放入map()闭包中的任何内容都将在执行器上运行,因此将出现在执行器日志中而不是驱动程序日志中。 对于执行器上的自定义log4j属性,您需要将log4j.properties添加到执行器类路径并将log4j.properties发送给执行器。
这可以通过向spark-submit命令添加以下参数来完成: --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties " --files ./log4j.properties 还有其他设置这些配置的方法,但这是最常见的方法。

1
val log = Logger.getLogger(getClass.getName),

你可以使用"log"来记录日志。如果需要更改记录器属性,需要在/conf文件夹中拥有log4j.properties文件。默认情况下,在该位置会有一个模板。

我已经尝试使用log4j,但是当从类(而不是对象)调用日志记录器变量时仍然存在序列化问题:Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger - Bogdan N
3
简单解决方案:在本地方法作用域中声明日志变量。 - nuaavee
2
如果您将“log”设置为@transient,会发生什么? - Mikael Ståldal
Map partition技术可以解决这个问题。您可以在mappartitions函数中创建记录器并使用它。此技术用于JDBC连接/mq/Kafka生产者。 - Ashkrit Sharma

0
这是我的解决方案:
我使用SLF4j(带有Log4j绑定), 在每个Spark作业的基类中,我有类似于以下内容的代码:
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass) 

就在我在分布式函数代码中使用LOG的地方之前,我将日志记录器引用复制到一个本地常量中。

val LOG = this.LOG

这对我有用!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接