Spark File Streaming get File Names
Asked Answered
C

2

5

I need to know the fileName for the input file that is streamed from the input dir.

Below is the spark FileStreaming code in scala programming

object FileStreamExample {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder.master("local").getOrCreate()

    val input_dir = "src/main/resources/stream_input"
    val ck = "src/main/resources/chkpoint_dir"

    //create stream from folder
    val fileStreamDf = sparkSession.readStream.csv(input_dir)

    def fileNames() = fileStreamDf.inputFiles.foreach(println(_))

    println("Streaming Started...\n")
    //fileNames() //even here it is throwing the same exception
    val query = fileStreamDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", ck)
      .start()

    fileNames();

    query.awaitTermination()

  }}

But facing the below exception, while streaming

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]
Chairborne answered 13/10, 2019 at 9:42 Comment(0)
H
8

You can use input_file_name() function defined in org.apache.spark.sql.functions._ to get the file name from which the rows are imported into the dataframe.

sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())
Hardner answered 15/10, 2019 at 5:30 Comment(1)
If we want size and last modified date of file, is anything available for it?Frizz
P
0

If you use Databricks, please note in Databricks SQL and Databricks Runtime 13.1 and above input_file_name is deprecated. Please use _metadata.file_name.

df = spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*", "_metadata")

https://docs.databricks.com/en/ingestion/file-metadata-column.html#language-python

Phytosociology answered 2/2, 2024 at 4:23 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.