Flink - No FileSystem for scheme: hdfs
Asked Answered
S

2

7

I am currently developing a Flink 1.4 application that reads an Avro file from a Hadoop cluster. However, running it in local mode on my IDE works perfectly fine. But when I submit it to the Jobmanager Flink it always fails with the following message:

java.io.IOException: Error opening the Input Split hdfs://namenode/topics/CaseLocations/partition=0/CaseLocations+0+0000155791+0000255790.avro [0,16549587]: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:110)
at org.apache.flink.formats.avro.AvroInputFormat.open(AvroInputFormat.java:54)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:864)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'hdfs'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:102)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 2 more
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:99)
... 3 more

I am running the cluster with the official Flink Docker image flink:1.4.0-hadoop28-scala_2.11 that should already contain a Hadoop distribution.

I have also tried to add the dependencies to my application jar but that did not help either. Here are my sbt dependencies:

val flinkVersion = "1.4.0"
val hadoopVersion = "2.8.1"
val providedDependencies = Seq(
    "org.apache.flink" %% "flink-clients" % flinkVersion,
    "org.apache.flink" %% "flink-scala" % flinkVersion,
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion
)
val compiledDependencies = Seq(
    "org.apache.flink" % "flink-hadoop-fs" % flinkVersion,
    "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion,
    "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
    "org.apache.flink" % "flink-avro" % flinkVersion,
    "org.apache.flink" %% "flink-table" % flinkVersion,
    "org.scalaj" %% "scalaj-http" % "2.2.1"
)

Also, the filesystem classes are included in my META-INF/services/org.apache.hadoop.fs.FileSystem.

Am I missing something? The official documentation couldn't help me.

Thanks in advance

Smoothtongued answered 14/2, 2018 at 10:12 Comment(6)
Could not find a file system implementation for scheme 'hdfs'... Does Flink have a core-site.xml for Hadoop configurations?Carnal
Including hadoop-hdfs and hadoop-common should have solved the issue.Pinzler
After I checked the logs of both the Jobmanager and the Taskmanager and in both cases the log files say: Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.hdfs.DistributedFileSystem not a subtype Any idea what's the cause of this?Smoothtongued
Please change loglevel to debugPinzler
Same issue hereOverarch
env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/user/rguo/checkpoints1" , true)) getting this Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)Guitarist
N
3

First, you need a cluster of HDFS.

Second, you need to check for flink-shaded-hadoop-2-uber-xxx.xx.jar under FLINK_HOME/lib.

If you plan to use Apache Flink together with Apache Hadoop (run Flink on YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file system connector) then select the download that bundles the matching Hadoop version, download the optional pre-bundled Hadoop that matches your version and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH.

Nekton answered 4/6, 2019 at 2:11 Comment(1)
I am facing similar issue. So, I installed the flink version with hadoop and added hadoop_conf_dir as environment variables. Then I am able to submit my jobs on local instance of flink.Butanone
V
0

i got the same promble today, and fixed it by two steps

  1. check the HADOOP_CONF_DIR(or HADOOP_HOME, HADOOP_CLASSPATH) was config right
  2. check the FLINK_HOME/lib has the flink-shaded-hadoop-2-uber-xxx.jar, if not then download from here

if the two steps was not ok ,you may need to restart flink cluster :)

Visit answered 2/3, 2022 at 12:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.