Spark Streaming:如何在Python中获取已处理文件的文件名

3

我对Spark(以及Python)是个新手,如果我错过了一些显而易见的东西,请原谅。

我正在使用Spark和Python进行文件流处理。在第一个示例中,Spark正确地监听给定目录并计算文件中单词出现次数,因此我知道在监听目录方面所有工作都正常。

现在我正在尝试获取已处理用于审计目的的文件名。我在这里阅读到 http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E 提到这不是一个简单的任务。我在这里找到了可能的解决方案 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E 我尝试按照以下方式实现它:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fileName(data):
    string = data.toDebugString

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingFileNamePrinter")
    ssc = StreamingContext(sc, 1)
    lines = ssc.textFileStream("file:///test/input/")
    files = lines.foreachRDD(fileName)
    print(files)
    ssc.start()
    ssc.awaitTermination()

很不幸,现在程序只监听了一次文件夹,输出了“None”,然后就什么也没做。与之前可以正常工作的代码唯一的区别就是

标签。
files = lines.foreachRDD(fileName)

在我考虑获取文件名之前(明天的问题),有人能看出为什么这只检查了一次目录吗?

提前感谢 M


你是在向目录添加文件吗?但是你得到了None,因为你的fileName函数没有返回任何内容...尝试返回data.toDebugString。 - user3689574
此外,您不需要使用foreachRDD,只需在DStream上执行映射,然后pprint即可。 - user3689574
@user3689574 两条有用的建议,谢谢。虽然都没有解决我的问题,但我会发布我找到的解决方案。非常感谢 :) - swinefish
2个回答

3

所以这是一个新手错误。我为自己和其他人提供参考,发布我的解决方案。

正如@user3689574所指出的那样,我没有在函数中返回调试字符串。这完全解释了为什么我会得到“None”。

接下来,我将调试内容放在函数外面打印,这意味着它从未成为foreachRDD的一部分。将其移动到函数内部,如下所示:

def fileName(data):
    debug = data.toDebugString()
    print(debug)

这样做可以按照预期打印出调试信息,并继续监听目录。通过更改,解决了我的初始问题。获取文件名已经变得非常简单。

当目录中没有变化时,调试字符串如下:

(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []

这清楚地表明没有文件。当文件被复制到目录中时,调试输出如下:

(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []

希望这对其他人有所帮助。只需快速使用正则表达式即可轻松获得文件名。

0
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def get_file_info(rdd):
    file_content = rdd.collect()
    file_name = rdd.toDebugString()
    print(file_name, file_content)


def main():
    sc = SparkContext("local[2]", "deneme")
    ssc = StreamingContext(sc, 1)  # One DSTREAM in the same time

    lines = ssc.textFileStream('../urne')
    # here is the call
    lines.foreachRDD(lambda rdd: get_file_info(rdd))

    # Split each line into words
    words = lines.flatMap(lambda line: line.split("\n"))

    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))

    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    wordCounts.pprint()

    ssc.start()
   
    ssc.awaitTermination()
   

if __name__ == "__main__":
    main()

然后,当你得到像这样的一些结果时:
b'(3) MapPartitionsRDD[237] at textFileStream at NativeMethodAccessorImpl.java:0 []
| UnionRDD[236] at textFileStream at NativeMethodAccessorImpl.java:0 []
| file:/some/directory/file0.068513 NewHadoopRDD[231] at textFileStream at NativeMethodAccessorImpl.java:0 []
| file:/some/directory/file0.069317 NewHadoopRDD[233] at textFileStream at NativeMethodAccessorImpl.java:0 []
| file:/some/directory/file0.070036 NewHadoopRDD[235] at textFileStream at NativeMethodAccessorImpl.java:0 []'
['6','3','4','3','6','0','1','7','10','2','0','0','1','1','10','8','7','7','0','8','8','9','7','2','9','1','5','8','9','9','0','6','0','4','3','4','8','5','8','10','5','2','3','6','10','2','1','0','4','3','1','8','2','10','4','0','4','4','1','4','3','1','2','5','5','3']

制作一个正则表达式来获取文件内容和文件名,Spark标记告诉您它有3个文件作为一个DSTREM,因此您可以从那里开始工作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接