Flink - 没有适用于方案"hdfs"的文件系统

7

我目前正在开发一个Flink 1.4应用程序,从Hadoop集群中读取一个Avro文件。然而,在我的IDE中以本地模式运行它是完全正常的。但是当我将其提交到Jobmanager Flink时,总是会出现以下错误信息:

java.io.IOException: Error opening the Input Split hdfs://namenode/topics/CaseLocations/partition=0/CaseLocations+0+0000155791+0000255790.avro [0,16549587]: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:110)
at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:54)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:864)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'hdfs'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:102)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 2 more
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:99)
... 3 more

我正在使用官方的Flink Docker镜像 flink:1.4.0-hadoop28-scala_2.11 来运行集群,该镜像应该已经包含了Hadoop分发版。

我还尝试将依赖项添加到我的应用程序JAR文件中,但也没有帮助。这里是我的sbt依赖项:

val flinkVersion = "1.4.0"
val hadoopVersion = "2.8.1"
val providedDependencies = Seq(
    "org.apache.flink" %% "flink-clients" % flinkVersion,
    "org.apache.flink" %% "flink-scala" % flinkVersion,
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion
)
val compiledDependencies = Seq(
    "org.apache.flink" % "flink-hadoop-fs" % flinkVersion,
    "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion,
    "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
    "org.apache.flink" % "flink-avro" % flinkVersion,
    "org.apache.flink" %% "flink-table" % flinkVersion,
    "org.scalaj" %% "scalaj-http" % "2.2.1"
)

同时,文件系统类被包含在我的 META-INF/services/org.apache.hadoop.fs.FileSystem 中。
我是否遗漏了什么?官方文档对我没有帮助。
提前致谢。

“Could not find a file system implementation for scheme 'hdfs'”... Flink 是否有用于 Hadoop 配置的 core-site.xml 文件? - OneCricketeer
包括 hadoop-hdfs 和 hadoop-common 应该解决了这个问题。 - kinkajou
1
在查看了Jobmanager和Taskmanager的日志之后,我发现两者的日志文件都显示:无法加载文件系统:java.util.ServiceConfigurationError:org.apache.hadoop.fs.FileSystem:提供者org.apache.hadoop.hdfs.DistributedFileSystem不是子类型。你有什么想法,这是什么原因造成的? - Mr. M
请将日志级别更改为调试模式。 - kinkajou
1
同样的问题在这里。 - juanmirocks
env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/user/rguo/checkpoints1", true)) 出现以下错误:Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 无法找到方案“hdfs”的文件系统实现。Flink不直接支持该方案,并且无法加载支持此方案的Hadoop文件系统。org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399) - Ricc
2个回答

3

首先,您需要一个HDFS集群。

其次,在FLINK_HOME/lib下检查是否有flink-shaded-hadoop-2-uber-xxx.xx.jar。

如果您计划与Apache Hadoop一起使用Apache Flink(在YARN上运行Flink,连接到HDFS,连接到HBase或使用基于Hadoop的文件系统连接器),则选择捆绑匹配Hadoop版本的下载,下载匹配您版本的可选预打包Hadoop并将其放置在Flink的lib文件夹中,或导出您的HADOOP_CLASSPATH。


我遇到了类似的问题。因此,我安装了带有Hadoop的Flink版本,并将hadoop_conf_dir添加为环境变量。然后,我就能够在本地Flink实例上提交我的作业了。 - Pritam Sadhukhan

0

今天我遇到了同样的问题,通过以下两个步骤解决了它:

  1. 检查 HADOOP_CONF_DIR(或 HADOOP_HOME、HADOOP_CLASSPATH)是否正确配置。
  2. 检查 FLINK_HOME/lib 目录下是否有 flink-shaded-hadoop-2-uber-xxx.jar 文件,如果没有,则从 这里 下载。

如果以上两个步骤都无法解决问题,您可能需要重新启动 Flink 集群 :)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接