Spark GC超过限制错误消息

4
我正在Spark中运行以下代码,以比较存储在CSV文件和Hive表中的数据。我的数据文件大小约为1.5GB,行数约为20亿。当我运行以下代码时,我遇到了"GC overhead limit exceeded error"错误。我不确定为什么会出现这个错误。我已经查阅了各种文章。
在测试3步骤sourceDataFrame.except(targetRawData).count > 0处发生了错误。我不确定是否存在内存泄漏。我该如何进行调试并解决这个问题?
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    //val sourceDataLocation = "hdfs://localhost:9000/sourcec.txt"
    val sourceDataLocation = "s3a://rbspoc-sas/sas_valid_large.txt"
    val targetTableName = "temp_TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here

    // Test 1 - Validate the Structure
       println("Validating the table structure...")
       var startTime = getTimestamp()
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val sourceData = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val sourceDataFrame = spark.createDataFrame(sourceData,schema)
       //val sourceDataFrame = sourceDataFrame.toDF(sourceDataFrame.columns map(_.toLowerCase): _*)

       val sourceSchemaList = flatten(sourceDataFrame.schema).map(r => r.dataType.toString).toList
       val targetSchemaList = flatten(targetRawData.schema).map(r => r.dataType.toString).toList
       var endTime = getTimestamp()
       if (sourceSchemaList.diff(targetSchemaList).length > 0) {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed StructureValidation. ")
           // Force exit here if needed
          // sys.exit(1)
       } else {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed StructureValidation. ")
       }

    // Test 2 - Validate the Row count
       println("Validating the Row count...")
       startTime = getTimestamp()
       // check the row count.
       val sourceCount = sourceData.count()
       val targetCount = targetRawData.count()
       endTime = getTimestamp()
       if (sourceCount != targetCount){
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
           // Force exit here if needed
           //sys.exit(1)
         }
       else{
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
         }


    // Test 3 - Validate the data
    println("Comparing source and target data...")
    startTime = getTimestamp()
    if (sourceDataFrame.except(targetRawData).count > 0 ){
        endTime = getTimestamp()
        // Update the result in the table
        println("Updating DataValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed DataMatch validation")
           // Force exit here if needed
          // sys.exit(1)
         }
       else{
           endTime = getTimestamp()
           println("Updating DataValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed DataMatch validation")
         }

    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - String length validation

  def UpdateResult(tableName: String, startTime: String, endTime: String, returnCode: Int, description: String){
    val insertString = s"INSERT INTO TABLE TestResult VALUES( FROM_UNIXTIME(UNIX_TIMESTAMP()),'$startTime','$endTime','$tableName',$returnCode,'$description')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToDate(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

   def convertToDate(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

    def flatten(scheme: StructType): Array[StructField] = scheme.fields.flatMap { f =>
      f.dataType match {
      case struct:StructType => flatten(struct)
      case _ => Array(f)
       }
      }

    def getTimestamp(): String = {
        val now = java.util.Calendar.getInstance()
        val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        timestampFormat.format(now.getTime())
    }

以下是异常信息:

17/12/21 05:18:40 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(8,0,ShuffleMapTask,TaskKilled(stage cancelled),org.apache.spark.scheduler.TaskInfo@78db3052,null)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 8.0 failed 1 times, most recent failure: Lost task 17.0 in stage 8.0 (TID 323, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
  ... 53 elided
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

scala> 17/12/21 05:18:40 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
java.io.IOException: Failed to delete: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

看起来,你正在本地模式下运行你的Spark作业。所有的执行都发生在Driver jvm上。当你调用"sourceDataFrame.except(targetRawData).count"时,将涉及计算从源数据帧和目标原始数据中减去数据并获取计数。这个计算将在同一个driver jvm上进行,它无法处理一个jvm中的所有数据。以"yarn-cluster"模式运行它,应该可以解决问题。 - Amit Kumar
或者尝试通过设置参数“--driver-memory”来增加驱动程序的内存。默认情况下,它分配了2GB的内存。增加它并运行它。 - Amit Kumar
尝试使用 ./bin/spark-shell --driver-memory 5g 命令来增加 Spark Shell 的驱动器内存。 - Amit Kumar
谢谢Amit。我会尝试这个。在我尝试之际,你知道except()函数使用driver内存还是executor内存来计算吗?由于我正在单节点集群上运行,我有总共8GB的RAM..大约2-3GB的RAM内存被Hadoop节点集群使用,所以我想知道是否应该为driverexecutor分配更多内存。 - Arun
Spark的转换执行取决于您在Spark中使用的部署模式。如果您使用“local”模式,则所有内容都将在驱动程序上运行。如果您使用“yarn-client”或“yarn-cluster”模式,则只有Spark驱动程序会生成执行器来计算任务。 - Amit Kumar
显示剩余2条评论
1个回答

0

你的Spark进程在垃圾回收方面浪费了太多时间。大部分CPU核心被消耗,处理无法完成。你的执行器内存不足。你可以尝试以下选项:

  • 调整属性spark.storage.memoryFractionspark.memory.storageFraction。你也可以使用以下命令进行调整:spark-submit ... --executor-memory 4096m --num-executors 20..
  • 或者通过更改GC策略来解决。检查当前GC值,并将其设置为-XX:G1GC

尽管Spark UI上显示垃圾回收的时间为0毫秒,但我仍然面临类似的情况。 - y2k-shubham
请您提供完整的堆栈跟踪信息,因为您提供的信息不足以追踪问题。 - Subash
后来我发现timeout与从spark.read.jdbc(..)方法触发到MySQL的嵌套查询有关。它类似于这个,但不完全相同。我将在博客文章中提供一个全面的描述,或者只是回答我的原始问题;用重现步骤和解决方案完整地描述问题。届时会通知您。 - y2k-shubham

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接