我对Spark(以及Python)是个新手,如果我错过了一些显而易见的东西,请原谅。
我正在使用Spark和Python进行文件流处理。在第一个示例中,Spark正确地监听给定目录并计算文件中单词出现次数,因此我知道在监听目录方面所有工作都正常。
现在我正在尝试获取已处理用于审计目的的文件名。我在这里阅读到 http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E 提到这不是一个简单的任务。我在这里找到了可能的解决方案 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E 我尝试按照以下方式实现它:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fileName(data):
string = data.toDebugString
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
很不幸,现在程序只监听了一次文件夹,输出了“None”,然后就什么也没做。与之前可以正常工作的代码唯一的区别就是标签。
files = lines.foreachRDD(fileName)
在我考虑获取文件名之前(明天的问题),有人能看出为什么这只检查了一次目录吗?
提前感谢 M