Spark Streaming检查点恢复非常缓慢。

21
  • 目标:通过Spark Streaming从Kinesis读取数据,并将数据以Parquet格式存储到S3中。
  • 情况: 应用程序最初运行良好,每次处理1小时的批次,平均处理时间不到30分钟。出现某些故障导致应用程序崩溃后,我们尝试从检查点重新启动。处理现在需要永远的时间,无法向前移动。 我们尝试以1分钟的批次间隔进行相同的测试,处理运行正常,需要1.2分钟才能完成批处理。当我们从检查点恢复时,每个批次需要约15分钟。
  • 注意事项: 我们使用s3进行检查点 每个执行器使用1个executor,19g内存和3个核心

附加截图:

首次运行-检查点恢复之前 Before checkpoint - Streaming Page

Before checkpoint - Jobs Page

Before checkpoint - Jobs Page2

尝试从检查点恢复: After checkpoint - Streaming Page After checkpoint - Jobs Page

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
    streams.foreachRDD(jsonRDDRaw =>{
      println(s"Old partitions ${jsonRDDRaw.partitions.length}")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions ${jsonRDD.partitions.length}")

      if(!jsonRDD.isEmpty()){
        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

        sqlContext.read.json(jsonRDD.map(f=>{
          val str = new String(f)
          if(str.startsWith("{\"message\"")){
            str.substring(11,str.indexOf("@version")-2)
          }
          else{
            str
          }
        })).registerTempTable("events")

        sqlContext.sql(
          """
            |select
            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |*
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)


        sqlContext.dropTempTable("events")
      }
    })
  }
}

Kinesis.scala

object Kinesis{


  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole){
      new AmazonKinesisClient()
    }
    else{
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)
    }


    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")


    val kinesisStreams = (0 until Config.numStreams).map { i =>
        println(s"creating stream for $i")
        if(Config.useIAMInstanceRole){
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

        }else{
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)

        }
      }

    val unionStreams = streamingContext.union(kinesisStreams)
    streamFunc(unionStreams)

    streamingContext
  }


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1){
      throw  new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
    }

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))


//    sys.ShutdownHookThread {
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    }
//

    streamingContext.start()
    streamingContext.awaitTermination()


  }




}

DAG DAG

enter image description here


图片很好,但我们能看到你的代码吗?最好是你的Spark DAG。 - Yuval Itzchakov
2
这绝对是我在SO上看过的最有视觉吸引力的帖子。感谢所有的截图 =D - Kristian
@interfector,你找到解决方案了吗? - Gaurav Shah
@GauravShah,我只能看到空白的区域,没有下面的图片。第一次运行 - 在检查点恢复之前,在检查点之前 - 流媒体页面。 - Amit Kumar
1
@amit_kumar,我没有看到图片的问题。这可能是你本地浏览器的问题。回到问题上,你是否尝试过查看服务器是否在交换内存?即使有19G的内存,它可能会做一些意外的事情。 - Daniel Wisehart
显示剩余12条评论
3个回答

4

@interfector 你可能会对这个感兴趣。 - Gaurav Shah
太棒了,你做得非常出色,甚至还修复了它,@gaurav-shah! - Yash Sharma

1
当一个失败的驱动程序重新启动时,会发生以下情况:
  1. 恢复计算 - 使用检查点信息来重新启动驱动程序,重构上下文并重新启动所有接收器。
  2. 恢复块元数据 - 将需要继续处理的所有块的元数据恢复。
  3. 重新生成不完整的作业 - 对于由于故障而未完成处理的批次,使用恢复的块元数据重新生成RDD和相应的作业。
  4. 读取保存在日志中的块 - 当执行这些作业时,块数据直接从预写式日志中读取。这样可以恢复可靠保存到日志中的所有必要数据。
  5. 重新发送未确认的数据 - 在故障发生时未保存到日志中的缓冲数据将由源再次发送,因为它尚未被接收器确认。

enter image description here 由于所有这些步骤都在驱动程序上执行,因此您的 0 个事件的批处理需要很长时间。这只会发生在第一批,然后事情就会正常化。

参考 这里

检查点非常慢,你尝试过使用KryoSerialization吗?另外考虑使用Datasets https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html,它具有更快的序列化和反序列化速度,并且带有编码器。 - Amit Kumar
谢谢@amit_kumar,我在Spark 2上尝试了同样的事情,但遇到了类似的问题。 - Gaurav Shah
我正在运行Spark 2.0并使用数据集 - 我看到完全相同的问题。 - Glennie Helles Sindholt
@GlennieHellesSindholt 我认为我们应该为Spark创建一个Jira问题,他们应该能够提供帮助。 - Gaurav Shah
1
@GauravShah 你创建了一个Jira问题吗?我也很有兴趣跟进这个工单。在流媒体重启后,我有一个出现在45分钟后的流媒体选项卡,并且在1小时后批处理时间越来越长,导致内存溢出。因为当我不从检查点重新启动时没有内存溢出,所以我想知道背后发生了什么。 - crak
显示剩余5条评论

0

我以前也遇到过类似的问题,我的应用程序变得越来越慢。

尝试在使用rdd后释放内存,调用rdd.unpersist()https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)

或将spark.streaming.backpressure.enabled设置为true

http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval

http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements

另外,检查您的locality设置,可能有太多数据在移动。


应用程序只在恢复过程中花费时间,而不是在常规处理中。如果我内存不足,rdd.unpersist会有所帮助,但这并不是问题所在。如果我无法像数据进入一样快速消耗数据,那么反压力是有用的,但实际上我可以做到。 - Gaurav Shah

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接