在Pyspark中检查HDFS文件是否存在

5

有人能建议一下在pyspark中检查文件是否存在的最佳方法吗?

目前我正在使用以下方法进行检查,请给予建议。

def path_exist(path):

try:
    rdd=sparkSqlCtx.read.format("orc").load(path)
    rdd.take(1)
    return True

except Exception as e:
    return False
4个回答

10

你可以通过Py4j使用Java API org.apache.hadoop.fs.{FileSystem, Path}

jvm = spark_session._jvm
jsc = spark_session._jsc
fs = jvm.org.apache.hadoop.fs.FileSystem.get(jsc.hadoopConfiguration())
if fs.exists(jvm.org.apache.hadoop.fs.Path("/foo/bar")):
    print("/foo/bar exists")
else:
    print("/foo/bar does not exist")

1
这比subprocess更快。 - SatZ
但我无法在UDF中使用它。有没有办法修改这个函数,以便可以使用UDF和withColumn运行它? - SatZ
从性能和优化的角度来看,哪一个更好使用?这个还是subprocess? - Akshat Chaturvedi
@emeth 你怎么使用通配符让它工作呢?(例如:"/foo/*_bar")。使用Path API会返回false... - manelmc

8

您可以使用subprocess从Python执行HDFS命令:

import subprocess

proc = subprocess.Popen(['hadoop', 'fs', '-test', '-e', path])
proc.communicate()

if proc.returncode != 0:
    print '%s does not exist' % path
else : 
    print '%s exists' % path

参见: Apache Spark - 检查文件是否存在

1
以下代码应该可以工作 -
import subprocess

out=subprocess.check_output("hadoop fs -ls /tmp/file.txt",shell=True)

out=out.strip()

out=out.split("\n")

for l in out:

if l.endswith(".txt"):

print "file exit"
    else:
        print "file not exit"

1
要在Pyspark上检查S3上的文件(类似于@emeth的帖子),您需要向FileSystem构造函数提供URI。
sc = spark.sparkContext
jvm = sc._jvm
conf = sc._jsc.hadoopConfiguration()
url = "s3://bucket/some/path/_SUCCESS"
uri = jvm.java.net.URI(url)
fs = jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
fs.exists(jvm.org.apache.hadoop.fs.Path(url))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接