Spark 2.0 implicit encoder, deal with missing column when type is Option[Seq[String]] (scala)
Asked Answered
D

2

10

I'm having some trouble encoding data when some columns that are of type Option[Seq[String]] are missing from our data source. Ideally I would like the missing column data to be filled with None.

Scenario:

We have some parquet files that we are reading in that have column1 but not column2.

We load the data in from these parquet files into a Dataset, and cast it as MyType.

case class MyType(column1: Option[String], column2: Option[Seq[String]])

sqlContext.read.parquet("dataSource.parquet").as[MyType]

org.apache.spark.sql.AnalysisException: cannot resolve 'column2' given input columns: [column1];

Is there a way to create the Dataset with column2 data as None?

Dutra answered 3/1, 2017 at 23:50 Comment(0)
M
10

In simple cases you can provide an initial schema which is a superset of expected schemas. For example in your case:

val schema = Seq[MyType]().toDF.schema

Seq("a", "b", "c").map(Option(_))
  .toDF("column1")
  .write.parquet("/tmp/column1only")

val df = spark.read.schema(schema).parquet("/tmp/column1only").as[MyType]
df.show
+-------+-------+
|column1|column2|
+-------+-------+
|      a|   null|
|      b|   null|
|      c|   null|
+-------+-------+
df.first
MyType = MyType(Some(a),None)

This approach can be a little bit fragile so in general you should rather use SQL literals to fill the blanks:

spark.read.parquet("/tmp/column1only")
  // or ArrayType(StringType)
  .withColumn("column2", lit(null).cast("array<string>"))
  .as[MyType]
  .first
MyType = MyType(Some(a),None)
Merralee answered 4/1, 2017 at 0:51 Comment(2)
How would this situation be handled if a column (or a number of columns) is optionally missing (a column may or may not be present)? Saw you want to conform data from a KV map to a case class based on the same key to value mapping?Serieswound
@OmkarNeogi there is one solution here which create an implicit method on DF which will check for missing fields then fill those with null #44886711Brandenburg
A
0

Since spark 3.1, you can make use of the allowMissingColumns parameter for Dataset#unionByName to introduce missing columns:

spark.read.parquet("dataSource.parquet")
  // Add extra column by joining with empty dataframe using the expected schema
  .unionByName(spark.createDataFrame(sc.emptyRDD[Row], schema), true)
  // Extra check to see if schema really matches
  .as(RowEncoder.apply(schema))
Aramen answered 20/2 at 8:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.