如何在Spark streaming中创建停止条件?

3
我希望使用Spark Streaming从HDFS读取数据。另一个程序将不断向HDFS目录上传新文件,而我的Spark流处理作业将处理这些文件。但是,我还希望有一个结束条件。也就是说,上传文件到HDFS的程序可以向Spark Streaming程序发出信号,表示它已经完成了所有文件的上传。
此处的示例程序为例。下面展示代码。假设另一个程序正在上传这些文件,如何通过该程序以编程方式向Spark Streaming程序发出结束条件的信号(不需要按CTRL+C键)?
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage StreamingWordCount <input-directory> <output-directory>")
      System.exit(0)
    }
    val inputDir=args(0)
    val output=args(1)
    val conf = new SparkConf().setAppName("Spark Streaming Example")
    val streamingContext = new StreamingContext(conf, Seconds(10))
    val lines = streamingContext.textFileStream(inputDir)
    val words = lines.flatMap(_.split(" "))
    val wc = words.map(x => (x, 1))
    wc.foreachRDD(rdd => {
      val counts = rdd.reduceByKey((x, y) => x + y)
      counts.saveAsTextFile(output)
      val collectedCounts = counts.collect
      collectedCounts.foreach(c => println(c))
    }
    )

    println("StreamingWordCount: streamingContext start")
    streamingContext.start()
    println("StreamingWordCount: await termination")
    streamingContext.awaitTermination()
    println("StreamingWordCount: done!")
  }
}

请问您能否在上传作业数据的结尾添加一些控制字节,然后在 Spark Streaming 程序中监视这些字节,并在匹配到这些字节时终止程序?类似于添加 0x1c0x0d 这样的内容。另外,为什么要使用 Spark Streaming 处理此用例,而不是在上传文件后启动另一个作业呢? - pjames
1个回答

3

好的,我明白了。基本上你需要创建另一个线程来调用ssc.stop(),以发出停止流处理的信号。例如,像这样。

val ssc = new StreamingContext(sparkConf, Seconds(1))
//////////////////////////////////////////////////////////////////////
val thread = new Thread 
{
    override def run 
    {
        ....
        // On reaching the end condition
        ssc.stop()
    }
}
thread.start
//////////////////////////////////////////////////////////////////////
val lines = ssc.textFileStream("inputDir")
.....

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接