How to create an empty DataFrame with a specified schema?
Asked Answered
B

11

119

I want to create on DataFrame with a specified schema in Scala. I have tried to use JSON read (I mean reading empty file) but I don't think that's the best practice.

Bathyal answered 17/7, 2015 at 13:58 Comment(0)
K
156

Lets assume you want a data frame with the following schema:

root
 |-- k: string (nullable = true)
 |-- v: integer (nullable = false)

You simply define schema for a data frame and use empty RDD[Row]:

import org.apache.spark.sql.types.{
    StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("k", StringType, true) ::
    StructField("v", IntegerType, false) :: Nil)

// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
spark.createDataFrame(sc.emptyRDD[Row], schema)

PySpark equivalent is almost identical:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])

# or df = sc.parallelize([]).toDF(schema)

# Spark < 2.0 
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)

Using implicit encoders (Scala only) with Product types like Tuple:

import spark.implicits._

Seq.empty[(String, Int)].toDF("k", "v")

or case class:

case class KV(k: String, v: Int)

Seq.empty[KV].toDF

or

spark.emptyDataset[KV].toDF
Kimberly answered 17/7, 2015 at 14:54 Comment(2)
This is the most appropriate answer - complete, and also useful if you want to reproduce the schema of an existing dataset quickly. I don't know why is it not the accepted one.Glossy
How to create the df with the trait instead of case class: #64277452Mabelmabelle
G
48

As of Spark 2.0.0, you can do the following.

Case Class

Let's define a Person case class:

scala> case class Person(id: Int, name: String)
defined class Person

Import spark SparkSession implicit Encoders:

scala> import spark.implicits._
import spark.implicits._

And use SparkSession to create an empty Dataset[Person]:

scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]

Schema DSL

You could also use a Schema "DSL" (see Support functions for DataFrames in org.apache.spark.sql.ColumnName).

scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)

scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> emptyDF.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
Grapnel answered 16/8, 2016 at 7:7 Comment(4)
Hi, the compiler say that spark.emptyDataset not exist on my module, How to use it? there are some (correct) similar to (non-correct) val df = apache.spark.emptyDataset[RawData]?Mikesell
@PeterKrauss spark is the value you create using SparkSession.builder not part of org.apache.spark package. There are two spark names in use. It's the spark you have available in spark-shell out of the box.Grapnel
Thanks Jacek. I corrected: the SparkSession.builder object is passed as parameter (seems the best solution) from first general initialization, now is running.Mikesell
Is there a way to create the empty dataframe using trait instead of case class : #64277452Mabelmabelle
D
5

Java version to create empty DataSet:

public Dataset<Row> emptyDataSet(){

    SparkSession spark = SparkSession.builder().appName("Simple Application")
                .config("spark.master", "local").getOrCreate();

    Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema());

    return emptyDataSet;
}

public StructType getSchema() {

    String schemaString = "column1 column2 column3 column4 column5";

    List<StructField> fields = new ArrayList<>();

    StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true);
    fields.add(indexField);

    for (String fieldName : schemaString.split(" ")) {
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    }

    StructType schema = DataTypes.createStructType(fields);

    return schema;
}
Dwell answered 16/5, 2018 at 4:32 Comment(0)
A
3
import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
    hiveContext.createDataFrame(sc.emptyRDD[Row],
      ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
    )
  case class RawData(id: String, firstname: String, lastname: String, age: Int)
  val sourceDF = createEmptyDataFrame[RawData]
Aviate answered 19/9, 2016 at 10:21 Comment(0)
P
3

Here you can create schema using StructType in scala and pass the Empty RDD so you will able to create empty table. Following code is for the same.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType



//import org.apache.hadoop.hive.serde2.objectinspector.StructField

object EmptyTable extends App {
  val conf = new SparkConf;
  val sc = new SparkContext(conf)
  //create sparksession object
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

  //Created schema for three columns 
   val schema = StructType(
    StructField("Emp_ID", LongType, true) ::
      StructField("Emp_Name", StringType, false) ::
      StructField("Emp_Salary", LongType, false) :: Nil)

      //Created Empty RDD 

  var dataRDD = sc.emptyRDD[Row]

  //pass rdd and schema to create dataframe
  val newDFSchema = sparkSession.createDataFrame(dataRDD, schema)

  newDFSchema.createOrReplaceTempView("tempSchema")

  sparkSession.sql("create table Finaltable AS select * from tempSchema")

}
Pennypennyaliner answered 31/10, 2017 at 10:51 Comment(0)
P
3

This is helpful for testing purposes.

Seq.empty[String].toDF()
Perni answered 10/9, 2020 at 15:35 Comment(1)
How to create empty df from trait instead :#64277452Mabelmabelle
P
2

Here is a solution that creates an empty dataframe in pyspark 2.0.0 or more.

from pyspark.sql import SQLContext
sc = spark.sparkContext
schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)])
sqlContext.createDataFrame(sc.emptyRDD(), schema)
Pyrite answered 5/12, 2016 at 9:22 Comment(0)
G
1

I had a special requirement wherein I already had a dataframe but given a certain condition I had to return an empty dataframe so I returned df.limit(0) instead.

Gaynell answered 22/11, 2020 at 17:30 Comment(0)
P
0

I'd like to add the following syntax which was not yet mentioned:

Seq[(String, Integer)]().toDF("k", "v")

It makes it clear that the () part is for values. It's empty, so the dataframe is empty.

This syntax is also beneficial for adding null values manually. It just works, while other options either don't or are overly verbose.

Planet answered 20/6, 2022 at 19:50 Comment(0)
L
0

We were having issues with the emptyRDD method after converting to Spark 13.3 / enabling Unity Catalog in Databricks. The below solution works as a replacement for both.

import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.sql.Row
import java.util.ArrayList

val schema = new StructType()
  .add("column1", StringType, true)
  .add("column2", StringType, true)

val df = spark.createDataFrame(
  new ArrayList[Row],
  schema
)
df.count()
Lionfish answered 20/12, 2023 at 18:55 Comment(0)
D
-3

As of Spark 2.4.3

val df = SparkSession.builder().getOrCreate().emptyDataFrame
Disjoint answered 17/7, 2019 at 0:51 Comment(1)
This does not solve the schema part of the question.Sweptback

© 2022 - 2024 — McMap. All rights reserved.