Spark Streaming: StreamingContext不能读取数据文件

9
我是一个Spark Streaming的新手,正在尝试使用Spark-shell开始学习它。假设我已经在spark-1.2.0-bin-hadoop2.4的根目录下有一个名为"dataTest"的目录。
我想在shell中测试的简单代码是(在输入$.\bin\spark-shell后):
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
val data = ssc.textFileStream("dataTest")
println("Nb lines is equal to= "+data.count())
data.foreachRDD { (rdd, time) => println(rdd.count()) }
ssc.start()
ssc.awaitTermination()

然后,我将一些文件复制到“dataTest”目录中(并尝试重命名该目录中的某些现有文件)。

但不幸的是,我没有得到我想要的结果(即,我没有得到任何输出,所以看起来ssc.textFileStream不能正确工作),只得到一些类似以下的东西:

15/01/15 19:32:46 INFO JobScheduler: Added jobs for time 1421346766000 ms
15/01/15 19:32:46 INFO JobScheduler: Starting job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO SparkContext: Starting job: foreachRDD at <console>:20
15/01/15 19:32:46 INFO DAGScheduler: Job 69 finished: foreachRDD at <console>:20
, took 0,000021 s
0
15/01/15 19:32:46 INFO JobScheduler: Finished job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO MappedRDD: Removing RDD 137 from persistence list
15/01/15 19:32:46 INFO JobScheduler: Total delay: 0,005 s for time 1421346766000
ms (execution: 0,002 s)
15/01/15 19:32:46 INFO BlockManager: Removing RDD 137
15/01/15 19:32:46 INFO UnionRDD: Removing RDD 78 from persistence list
15/01/15 19:32:46 INFO BlockManager: Removing RDD 78
15/01/15 19:32:46 INFO FileInputDStream: Cleared 1 old files that were older tha
n 1421346706000 ms: 1421346704000 ms
15/01/15 19:32:46 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
7个回答

4
使用命令行复制文件/文档或将文件/文档另存到工作目录中对我很有效。 当您正常地通过IDE复制时,无法影响流上下文监视器修改日期。

3

您是否尝试将文本文件从另一个目录移动到正在监视的目录中?为了使文件流正常工作,您需要将文件原子地放入监视目录中,这样一旦文件在列表中可见,Spark 就可以读取文件中的所有数据(如果您将文件复制到目录中,则可能不是这种情况)。

编程指南的基本源子部分中有详细说明。


我尝试移动、重命名文本文件,但遗憾的是我没有得到任何输出!! 你能行吗? - Momog
我还必须确保格式正确 - 我犯了一个错误,将我的文本文件保存为 .txt.gz 格式,这不起作用。一旦我将其保存为纯文本,它就可以工作了。 - ohad serfaty

1
以下代码适用于我
class StreamingData extends Serializable {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
    //val sc = new SparkContext(conf)
    val ssc = new StreamingContext(conf, Seconds(1))
    val input = ssc.textFileStream("file:///C:/Users/M1026352/Desktop/Spark/StreamInput")
    val lines = input.flatMap(_.split(" "))
    val words = lines.map(word => (word, 1))
    val counts = words.reduceByKey(_ + _)
    counts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

只需要将文本文件保留为Unix格式即可。如果在notepad++中打开文件,请转到设置->首选项->新文档->Unix/OSX,然后更改文件名以使其被Scala选中。https://stackoverflow.com/a/41495776/5196927

请参考上面的链接。


1
我也曾遇到过同样的问题,对我来说解决办法是在流媒体运行时编辑并保存我想要用作输入流的文件。然后,在流媒体仍在运行时,直接将输入文件移动到流媒体目录中。

0

我认为这应该通常可以工作,但问题可能是建议将Spark Streaming作为独立应用程序而不是在spark-shell中运行。

我将其作为独立应用程序(在其他流数据上)运行,并且它可以正常工作。

data.count()会给出DStream中每个RDD中有多少元素,这与您在foreachRDD()中计数的相同。


你测试过什么类型的流数据?它能够处理存在于你硬盘上的文本文件吗?谢谢回复。 - Momog
我之前测试过Twitter流。今天我使用textFileStream测试了一个文件,就像你在示例中所做的那样。我无法在我的Macbook上使用spark-shell使其正常工作。我决定尝试在nfs文件系统上运行,以防Macbook文件系统与hdfs不兼容。我仍然使用本地主节点的spark-shell。我将批处理大小从2增加到10,以便更容易看到日志输出。只要我从同一文件系统的另一个目录中“mv”文件(而不是“cp”),代码就可以在nfs上正常工作。从同一目录移动文件则无法正常工作。 - xyzzy

0

我正在做几乎相同的事情(在Windows 8笔记本电脑上作为独立应用程序),对我来说这很好用,但是我的"dataTest"文件夹位于"bin"的子文件夹中。也许你可以试试这个方法?


更好的做法是在评论区发表此类回复。 - Razib

0

我刚刚在shell中使用了你的代码,它运行得很好。当我将一些文件放到目录(HDFS)中时,我得到了如下输出日志:

15/07/23 10:46:36 INFO dstream.FileInputDStream: Finding new files took 9 ms
15/07/23 10:46:36 INFO dstream.FileInputDStream: New files at time 1437619596000 ms:
hdfs://master:9000/user/jared/input/hadoop-env.sh
15/07/23 10:46:36 INFO storage.MemoryStore: ensureFreeSpace(235504) called with curMem=0, maxMem=280248975
......
15/07/23 10:46:36 INFO input.FileInputFormat: Total input paths to process : 1
15/07/23 10:46:37 INFO rdd.NewHadoopRDD: Input split: hdfs://master:9000/user/jared/input/hadoop-env.sh:0+4387
15/07/23 10:46:42 INFO dstream.FileInputDStream: Finding new files took 107 ms
15/07/23 10:46:42 INFO dstream.FileInputDStream: New files at time 1437619598000 ms:

15/07/23 10:46:42 INFO scheduler.JobScheduler: Added jobs for time 1437619598000 ms
15/07/23 10:46:42 INFO dstream.FileInputDStream: Finding new files took 23 ms
15/07/23 10:46:42 INFO dstream.FileInputDStream: New files at time 1437619600000 ms:

15/07/23 10:46:42 INFO scheduler.JobScheduler: Added jobs for time 1437619600000 ms
15/07/23 10:46:43 INFO dstream.FileInputDStream: Finding new files took 42 ms
15/07/23 10:46:43 INFO dstream.FileInputDStream: New files at time 1437619602000 ms:
15/07/23 10:46:43 INFO scheduler.JobScheduler: Added jobs for time 1437619602000 ms
15/07/23 10:46:43 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1830 bytes result sent to driver
15/07/23 10:46:43 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6098 ms on localhost (1/1)
15/07/23 10:46:43 INFO scheduler.DAGScheduler: ResultStage 0 (foreachRDD at <console>:29) finished in 6.178 s
15/07/23 10:46:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/07/23 10:46:43 INFO scheduler.DAGScheduler: Job 66 finished: foreachRDD at <console>:29, took 6.647137 s
101

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接