Spark的ssc.textFileStream无法从目录流式传输任何文件

16

我正在尝试使用 Eclipse(具有 Maven 配置),使用 2 个工作器,每个工作器都有 2 个核心,或者也尝试使用 spark-submit 执行以下代码。

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

那段代码的日志

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s
Sentences Collected from files []
-------------------------------------------
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
Time: 1421944033000 ms
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s)
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

问题是,我无法从目录中的文件中获取数据。请帮忙。


在Windows机器上遇到完全相同的问题,请建议。 - Gaurav Khare
我认为这只适用于HDFS,而不适用于本地文件系统。 - Gaurav Khare
请参考以下问题(及答案):https://stackoverflow.com/questions/33704326/spark-filestreaming-issue - Oliver Hummel
7个回答

13

尝试使用另一个目录,然后在作业运行时将这些文件复制到该目录。


是的,我也尝试了另一个目录。我不明白问题出在哪里,也不知道如何调试,甚至日志中也没有显示。 - Kaushal
1
但是您开始任务时目录是否为空? - pzecevic
实际上有一些文件已经存在,当我开始工作时,我也会复制一些文件。 - Kaushal
2
在程序运行时,应该使用HDFS并将文件复制到其中。 - pzecevic
1
是的,@pzecevic 您是正确的。Spark 只处理那些在作业执行后复制到 HDFS 中的文件,它不会读取之前存在于目录中的文件。 - Kaushal
显示剩余2条评论

6

我有同样的问题。

lines = jssc.textFileStream("file:///Users/projects/spark/test/data");

TextFileSTream非常敏感;我最终做的是:

1. Run Spark program
2. touch datafile
3. mv datafile datafile2
4. mv datafile2  /Users/projects/spark/test/data

就这样做到了。


我正在使用Windows。代码如下:lines = jssc.textFileStream("file:///c:/data"); lines.foreachRDD(file => { file.foreach(fc => { println(fc) }) }) 但是我没有得到输出,该怎么解决? - Gnana

2

我认为你需要在路径前面添加方案,即file://hdfs://


撤销我的评论编辑,因为:实际上需要在路径前面添加file://hdfs://,因此总路径变成file:///tmp/file.txthdfs:///user/data。如果配置中没有设置NameNode,则后者需要是hdfs://host:port/user/data


1
使用HDFS时,它可以工作,但是当我使用本地文件系统并在前面加上'file:///'(Spark不支持file://)前缀时,它无法工作。 - Kaushal
1
这可能是因为您正在使用集群,指定的路径必须可被所有Spark执行器访问,即仅Spark驱动程序可以访问是不够的。 - tgpfeiffer

0

你必须注意,Spark Streaming 只会读取目录中的新文件,而不是已更新的文件(一旦它们在目录中),并且它们都必须具有相同的格式。

源代码


0
JavaDoc建议仅流式处理新文件的函数。

Ref: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

创建一个输入流,监控Hadoop兼容文件系统中的新文件,并将它们作为文本文件读取(使用LongWritable作为键,Text作为值,TextInputFormat作为输入格式)。文件必须通过从同一文件系统中的另一个位置“移动”到监视目录来写入。以.开头的文件名将被忽略。

0

textFileStream 只能在文件夹中的文件被添加更新时监视该文件夹。

如果您只想读取文件,可以使用SparkContext.textFile


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接