Spark:仅在路径存在时读取文件

31

我试图在 Scala 中读取 Sequence 路径下的文件。以下是示例(伪)代码:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)

在上面的序列中,有些路径存在,而有些路径不存在。在读取parquet文件时,是否有办法忽略缺失的路径(以避免org.apache.spark.sql.AnalysisException: Path does not exist)?

我尝试过以下方法,似乎可以解决问题,但是最终会读取相同的路径两次,这是我想要避免的:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)

我查看了DataFrameReaderoptions方法,但似乎没有类似于ignore_if_missing的选项。

此外,这些路径可以是hdfss3(这个Seq作为方法参数传递),在读取时,我不知道一个路径是s3还是hdfs,因此无法使用s3hdfs特定的API来检查其是否存在。

5个回答

24

您可以像@Psidom的答案中那样过滤掉不相关的文件。在Spark中,最好的方法是使用内部Spark Hadoop配置。假设Spark会话变量名为“spark”,则可以执行以下操作:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = {
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)

5
根据您的系统设置,您可能需要在获取时指定文件系统位置:FileSystem.get(new URI("s3://bucket"), spark.sparkContext.hadoopConfiguration)。否则,它可能会创建一个HDFS文件系统并在检查S3文件系统路径时出错。 - Azuaron

3

首先,过滤一下paths

paths.filter(f => new java.io.File(f).exists)

例如:
Seq("/tmp", "xx").filter(f => new java.io.File(f).exists)
// res18: List[String] = List(/tmp)

1
“Paths” 可以是本地的 hdfs 路径或 s3 路径。不确定 File.exists 是否适用于 s3 - Darshan Mehta
1
如果路径是HDFS / S3路径(这些路径通常与Spark一起使用),则需要稍微不同的API来检查路径是否存在。[@DarshanMehta你比我快了3秒:)] - Tzach Zohar
@TzachZohar 哈哈,是的。我现在已经更新了问题。 - Darshan Mehta
2
对于S3,您可能需要检查doesObjectExist,而对于hdfs,您可以参考这个答案 - Psidom

2

7
这似乎仅适用于文件。如果路径是一个不存在的目录,则不会产生任何效果。 - Beni Murza
1
截至Spark 3.3.1,spark.sql.files.ignoreMissingFiles 用于在数据框构建后丢失的文件:https://github.com/apache/spark/blob/v3.3.1/docs/sql-data-sources-generic-options.md。 - aparkerlue

1

PySpark 3.1或更高版本

很遗憾,在Spark 3.1中,pyspark尚未提供任何标志来忽略它们(至少我不知道)。但您可以尝试这些简单的方法。好消息是,负载接口也适用于列表。请参见下面的示例。

# add you list of paths here
addrs = ["path1", "path2", ...]

# check if they exists, update the list
for add in addrs:
    try:
        spark.read.format("parquet").load(add)
    except:
        print(add)
        addrs.remove(add)

# read the updated list now
sdf_a = spark\
        .read\
        .format("parquet")\
        .load(addrs)

他们拥有读取、写入功能,但没有存在检查,这让我非常生气,我要给这个答案点踩。抱歉。 - Famous Jameis
2
这个答案对我来说似乎很符合“Pythonic”的风格——它不需要我们知道为什么读取失败或如何评估它是否会失败,只需要知道它确实失败了。由于Spark是惰性的,这些加载对运行时应该不会造成重大负担。我也喜欢我不需要学习关于HDFS或s3的任何新知识。 - John Haberstroh
尝试过这个方法,但有时会有漏网之鱼。 - Wildhammer

0

@s510的回答采用了一种Pythonic的“鸭子类型”风格,非常不错。然而,我更喜欢在可能的情况下使用不可变性,所以我会像这样重写它:

def path_is_readable(x):
  try:
    spark.read.parquet(x)
    return True
  except:
    return False

valid_paths = [p for p in paths if path_is_readable(p)]
dataframe = spark.read.parquet(*valid_paths)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接