apache spark - check if file exists
Asked Answered
M

9

38

I am new to spark and I have a question. I have a two step process in which the first step write a SUCCESS.txt file to a location on HDFS. My second step which is a spark job has to verify if that SUCCESS.txt file exists before it starts processing the data.

I checked the spark API and didnt find any method which checks if a file exists. Any ideas how to handle this?

The only method I found was sc.textFile(hdfs:///SUCCESS.txt).count() which would throw an exception when the file does not exist. I have to catch that exception and write my program accordingly. I didnt really like this approach. Hoping to find a better alternative.

Melloney answered 22/5, 2015 at 20:55 Comment(0)
C
58

For a file in HDFS, you can use the hadoop way of doing this:

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))
Cyclograph answered 23/5, 2015 at 2:1 Comment(0)
D
24

For Pyspark, you can achieve this without invoking a subprocess using something like:

fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
Devine answered 9/2, 2018 at 15:4 Comment(2)
elegant solution!Mischiefmaker
finally, I find it!Nesto
C
11

I will say, best way to call this through function which internally check for file presence in the traditional hadoop file check.

object OutputDirCheck {
  def dirExists(hdfsDirectory: String): Boolean = {
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
    fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
  }
}
Cryogen answered 23/11, 2016 at 6:27 Comment(0)
E
6

Using Databricks dbutils:

def path_exists(path):
  try:
    if len(dbutils.fs.ls(path)) > 0:
      return True
  except:
    return False
Eringo answered 1/5, 2020 at 15:0 Comment(1)
shorter way: def path_exists(path): return len(dbutils.fs.ls(path)) > 0Maniemanifest
M
4

for Spark 2.0 or higher you can use the method exist of hadoop.fr.FileSystem
:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

object Test extends App {
  val spark = SparkSession.builder
    .master("local[*]")
    .appName("BigDataETL - Check if file exists")
    .getOrCreate()

  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  // This methods returns Boolean (true - if file exists, false - if file doesn't exist
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")
}

for Spark 1.6 to 2.0

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object Test extends App {
  val sparkConf = new SparkConf().setAppName(s"BigDataETL - Check if file exists")
  val sc = new SparkContext(sparkConf)
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")
}
Maculate answered 4/2, 2020 at 9:5 Comment(0)
M
3

For PySpark:

from py4j.protocol import Py4JJavaError
def path_exist(path):
    try:
        rdd = sc.textFile(path)
        rdd.take(1)
        return True
    except Py4JJavaError as e:
        return False
Muscadine answered 15/3, 2019 at 11:20 Comment(0)
L
2

For Java coders;

 SparkConf sparkConf = new SparkConf().setAppName("myClassname");
        SparkContext sparky = new SparkContext(sparkConf);       
        JavaSparkContext context = new JavaSparkContext(sparky);

     FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
            Path path = new Path(sparkConf.get(path_to_File));

            if (!hdfs.exists(path)) {
                 //Path does not exist.
            } 
         else{
               //Path exist.
           }
Lohr answered 19/9, 2017 at 18:51 Comment(1)
For me sparkConf.get(path_to_File) complains the file does not exist which is the thing I was trying to avoid. new Path(path_to_File) directly however works.Marketplace
K
2

For pyspark python users:

i didn't find anything with python or pyspark so we need to execute hdfs command from python code. This has worked for me.

hdfs command to get if folder exisits : returning 0 if true

hdfs dfs -test -d /folder-path

hdfs command to get if file exists : returning 0 if true

hdfs dfs -test -d /folder-path 

For putting this in python code i followed below lines of code :

import subprocess

def run_cmd(args_list):
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
    proc.communicate()
    return proc.returncode

cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
            code = run_cmd(cmd)
if code == 0:
    print('folder exist')
    print(code) 

Output if folder exists :

folder exists 0

Knighterrant answered 24/10, 2017 at 10:32 Comment(0)
I
0

@Nandeesh's answer evaluates all the Py4JJavaError exceptions. I propose to add another step to evaluate java exception error message:

from py4j.protocol import Py4JJavaError


def file_exists(path):
    try:
        spark.sparkContext.textFile(path).take(1)
    except Py4JJavaError as e:
        if 'org.apache.hadoop.mapred.InvalidInputException: Input path does not exist' in str(e.java_exception):
            return False
        else:
            return True
Inconvincible answered 11/5, 2022 at 15:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.