inferSchema=true isn't working for csv file reading n Spark Structured Streaming
Asked Answered
P

3

5

I'm getting the error message

java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.

    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
    at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)
    at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)

when I'm loading the csv file with

spark2
  .readStream
  .format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  //.schema(schema)
  .option("delimiter", ",")
  .option("maxFilesPerTrigger", 1)
  .csv(path)

and I have tried another format of the options like

spark2
  .readStream
  .format("csv")
  .option("inferSchema", value = true)
  .option("header", value = true)
  //.schema(schema)
  .option("delimiter", ",")
  .option("maxFilesPerTrigger", 1)
  .csv(path)

I'd like to infer the schema and commented out the explicit schema usage.

The csv file example is below:

id,Energy Data,Distance,Humidity,Ambient Temperature,Cold Water Temperature,Vibration Value 1,Vibration Value 2,Handle Movement
1,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
2,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
3,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
4,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
5,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
6,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
7,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
8,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
9,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
10,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2

what is wrong here, as I follow the options instructions strictly, but how infer occurs?

Postfree answered 17/10, 2021 at 19:56 Comment(0)
M
5

You have 2 options here:

  1. Before running the streaming query, write once a sample of your data into your destination. When you will run the streaming query again, the schema will be inferred.
  2. Set spark.sql.streaming.schemaInference to true:
spark.sql("set spark.sql.streaming.schemaInference=true")

From the docs:

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Multiple answered 18/10, 2021 at 7:2 Comment(0)
C
1

Solution is in the error message: "....If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it."

First create the schema:

file_schema = spark.read
                   .format("csv")
                   .option("inferSchema", True)
                   .option("header", True)
                   .load(directory)
                   .limit(10)
                   .schema

Then read the stream:

spark.readStream
     .format("csv")
     .schema(file_schema)
     .load(directory)
Criminality answered 13/1 at 15:36 Comment(0)
E
0

We must specify schema when creating a streaming source DataFrame.

From the documentation:

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures.

Elnoraelnore answered 18/10, 2021 at 6:57 Comment(2)
The answer https://mcmap.net/q/2022357/-inferschema-true-isn-39-t-working-for-csv-file-reading-n-spark-structured-streaming sugegsts otherwise, I'm afraidHypostasize
I have given the answer in first place, later that answer popped up with additional details from documentation. Not sure why i got down vote.Elnoraelnore

© 2022 - 2024 — McMap. All rights reserved.