How do I detect if a Spark DataFrame has a column
Asked Answered
E

11

70

When I create a DataFrame from a JSON file in Spark SQL, how can I tell if a given column exists before calling .select

Example JSON schema:

{
  "a": {
    "b": 1,
    "c": 2
  }
}

This is what I want to do:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

but I can't find a good function for hasColumn. The closest I've gotten is to test if the column is in this somewhat awkward array:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
Easiness answered 9/3, 2016 at 22:40 Comment(0)
I
128

Just assume it exists and let it fail with Try. Plain and simple and supports an arbitrary nesting:

import scala.util.Try
import org.apache.spark.sql.DataFrame

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess

val df = sqlContext.read.json(sc.parallelize(
  """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))

hasColumn(df, "foobar")
// Boolean = false

hasColumn(df, "foo")
// Boolean = true

hasColumn(df, "foo.bar")
// Boolean = true

hasColumn(df, "foo.bar.foobar")
// Boolean = true

hasColumn(df, "foo.bar.foobaz")
// Boolean = false

Or even simpler:

val columns = Seq(
  "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")

columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
//   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)

Python equivalent:

from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()

has_column(df, "foobar")
## False

has_column(df, "foo")
## True

has_column(df, "foo.bar")
## True

has_column(df, "foo.bar.foobar")
## True

has_column(df, "foo.bar.foobaz")
## False
Independence answered 31/3, 2016 at 11:26 Comment(7)
This works with structured field as well. The solutions that uses contains function does not! +1Enuresis
On first glance the df(path) or df[col] looks like it would be a very costly test but this is all lazy dag building so it's cheap, is that correct?Lifeboat
@Lifeboat That has little to do with laziness. Columns are not data containers, but components of query description model. In broader context to know anything about Dataset you have process and check its logical QueryExectution.analyzed plan, that applies to col / apply (both resolve) or schema / columns alike.Stamata
@10465355saysReinstateMonica Thanks that's exactly what I meant. I am not referring to lazy directives in Scala but rather the phased approach of how Spark creates a logical plan, transforms it to a physical plan and then executes the tasks on the cluster. If this is resolved at the logical plan stage then it's cheap.Lifeboat
@Independence The python equivalent does not work on struct columns which are inside an array. e.g. '{ "name": "test", "address":[{"houseNumber":"1234"}]}' should df.select be used instead of df[col]Joslyn
@Independence how will you check of nested array.. in df(path) it does not accept array elements like property[0].property[1].number..Conference
For me this solution resulted with **An exception or error caused a run to abort: org.apache.commons.text.StringSubstitutor.setEnableUndefinedVariableException(Z)Lorg/apache/commons/text/StringSubstitutor; ** due to some jar hell problem with commons-textSpook
M
75

Another option which I normally use is

df.columns.contains("column-name-to-check")

This returns a boolean

Mozza answered 19/7, 2017 at 2:56 Comment(4)
Yes that's correct it doesn't work with nested columns.Mozza
Try:: df.columns.__contains__("column-name-to-check")Alectryomancy
df.columns.__contains__("column-name-to-check") works.Dorman
This is partially incorrect, as the output from df.columns is a list type(df.columns) returns <class 'list'> and list don't have a 'contains' attribute. 'col_to_check' in df.columns is fineIndochina
S
14

Actually you don't even need to call select in order to use columns, you can just call it on the dataframe itself

// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)

// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)

// then you can just use it on the DF with a given column name
hasColumn(testDF, "a")  // <-- true
hasColumn(testDF, "c")  // <-- false

Alternatively you can define an implicit class using the pimp my library pattern so that the hasColumn method is available on your dataframes directly

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
    def hasColumn(colName: String) = df.columns.contains(colName)
}

Then you can use it as:

testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false
Shepley answered 10/3, 2016 at 11:1 Comment(1)
This doesn't work with the nested columns. from json {"a":{"b":1,"c":0}}Easiness
P
12

Try is not optimal as the it will evaluate the expression inside Try before it takes the decision.

For large data sets, use the below in Scala:

df.schema.fieldNames.contains("column_name")
Propriety answered 23/1, 2017 at 5:51 Comment(1)
I agree Try is not optimal. What I do is: I create a column with the array of fields and then test with array_contains: val fields = df.schema.fieldNames; df.withColumn("fields",lit(fields)).withColumn("has_column", when(array_contains($"fields","field1"),lit(true))) ```Jeromejeromy
B
8

For those who stumble across this looking for a Python solution, I use:

if 'column_name_to_check' in df.columns:
    # do something

When I tried @Jai Prakash's answer of df.columns.contains('column-name-to-check') using Python, I got AttributeError: 'list' object has no attribute 'contains'.

Bland answered 10/5, 2019 at 18:0 Comment(2)
Does this way of checking col in the data-frame slow down the spark processing?Cashmere
@Cashmere Can you help me better understand your question? Is there some other approach against which you want to compare performance?Bland
C
4

Your other option for this would be to do some array manipulation (in this case an intersect) on the df.columns and your potential_columns.

// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF

// The columns we want to extract
val potential_columns = Seq("b", "c", "d")

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show

Alas this will not work for you inner object scenario above. You will need to look at the schema for that.

I'm going to change your potential_columns to fully qualified column names

val potential_columns = Seq("a.b", "a.c", "a.d")

// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)

// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF

// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show

This only goes one level deep, so to make it generic you would have to do more work.

Complexioned answered 10/3, 2016 at 14:54 Comment(0)
H
1

If you shred your json using a schema definition when you load it then you don't need to check for the column. if it's not in the json source it will appear as a null column.

        val schemaJson = """
  {
      "type": "struct",
      "fields": [
          {
            "name": field1
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": field2
            "type": "string",
            "nullable": true,
            "metadata": {}
          }
      ]
  }
        """
    val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]

    val djson = sqlContext.read
    .schema(schema )
    .option("badRecordsPath", readExceptionPath)
    .json(dataPath)
Humfried answered 12/6, 2019 at 10:16 Comment(0)
S
0
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
  Try(df.select(colName)).isSuccess

Use the above mentioned function to check the existence of column including nested column name.

Stowaway answered 12/4, 2019 at 5:43 Comment(0)
E
0

In PySpark, df.columns gives you a list of columns in the dataframe, so "colName" in df.columns would return a True or False. Give a try on that. Good luck!

Equiprobable answered 8/11, 2019 at 23:41 Comment(1)
df1.columns displays ['bankData', 'reference', 'submissionTime']; but df1['bankData']['userAddress'].columns displays Column<'bankData[userAddress][columns]'>, doesnt show me the struct, am I missing something?Grindelia
G
0

For nested columns you can use

df.schema.simpleString().find('column_name')
Graduate answered 1/6, 2020 at 22:17 Comment(1)
This does not seem reliable to me if you have a Column named similarly to the extraneous text in the Schema string.Armitage
W
0

For nested columns, you can use the following code::

def check_field_exists(x, key):
    try:
        return eval(f"x.{key}")
    except AttributeError:
        return None

get_field_udf = udf(check_field_exists, StringType())
df = df.withColumn("b", get_field_udf(col("a"), lit("b")))
Wrest answered 14/5 at 13:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.