How to query JSON data column using Spark DataFrames?
Asked Answered
U

5

71

I have a Cassandra table that for simplicity looks something like:

key: text
jsonData: text
blobData: blob

I can create a basic data frame for this using spark and the spark-cassandra-connector using:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

I'm struggling though to expand the JSON data into its underlying structure. I ultimately want to be able to filter based on the attributes within the json string and return the blob data. Something like jsonData.foo = "bar" and return blobData. Is this currently possible?

Unlettered answered 3/12, 2015 at 15:3 Comment(3)
Is key an unique identifier?Exercitation
yes, key is the primary key of the tableUnlettered
See this; excellent example of how to flatten: bigdataprogrammers.com/read-nested-json-in-spark-dataframeHyps
E
113

Spark >= 2.4

If needed, schema can be determined using schema_of_json function (please note that this assumes that an arbitrary row is a valid representative of the schema).

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

Spark >= 2.1

You can use from_json function:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

Spark >= 1.6

You can use get_json_object which takes a column and a path:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

and extracts fields to individual strings which can be further casted to expected types.

The path argument is expressed using dot syntax, with leading $. denoting document root (since the code above uses string interpolation $ has to be escaped, hence $$.).

Spark <= 1.5:

Is this currently possible?

As far as I know it is not directly possible. You can try something similar to this:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

I assume that blob field cannot be represented in JSON. Otherwise you cab omit splitting and joining:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

An alternative (cheaper, although more complex) approach is to use an UDF to parse JSON and output a struct or map column. For example something like this:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)
Exercitation answered 3/12, 2015 at 15:36 Comment(2)
giving StructType not working in spark 2.2.0 for to_json ` List<StructField> fields = new ArrayList<>(2); fields.add(DataTypes.createStructField("a", DataTypes.StringType, false)); StructType schema = DataTypes.createStructType(fields); g4.withColumn("dd", functions.to_json(functions.col("a"),schema) ).show(); `Conditional
If you're looking for a more comprehensive approach that accounts for schema variability in your JSON data, you should probably use spark.read.json(). I have the details plus a full example in my answer.Pandemic
P
43

zero323's answer is thorough but misses one approach that is available in Spark 2.1+ and is simpler and more robust than using schema_of_json():

import org.apache.spark.sql.functions.from_json

val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))

Here's the Python equivalent:

from pyspark.sql.functions import from_json

json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))

The problem with schema_of_json(), as zero323 points out, is that it inspects a single string and derives a schema from that. If you have JSON data with varied schemas, then the schema you get back from schema_of_json() will not reflect what you would get if you were to merge the schemas of all the JSON data in your DataFrame. Parsing that data with from_json() will then yield a lot of null or empty values where the schema returned by schema_of_json() doesn't match the data.

By using Spark's ability to derive a comprehensive JSON schema from an RDD of JSON strings, we can guarantee that all the JSON data can be parsed.

Example: schema_of_json() vs. spark.read.json()

Here's an example (in Python, the code is very similar for Scala) to illustrate the difference between deriving the schema from a single element with schema_of_json() and deriving it from all the data using spark.read.json().

>>> df = spark.createDataFrame(
...     [
...         (1, '{"a": true}'),
...         (2, '{"a": "hello"}'),
...         (3, '{"b": 22}'),
...     ],
...     schema=['id', 'jsonData'],
... )

a has a boolean value in one row and a string value in another. The merged schema for a would set its type to string. b would be an integer.

Let's see how the different approaches compare. First, the schema_of_json() approach:

>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: boolean (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1|  [true]|
|  2|    null|
|  3|      []|
+---+--------+

As you can see, the JSON schema we derived was very limited. "a": "hello" couldn't be parsed as a boolean and returned null, and "b": 22 was just dropped because it wasn't in our schema.

Now with spark.read.json():

>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: long (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1| [true,]|
|  2|[hello,]|
|  3|  [, 22]|
+---+--------+

Here we have all our data preserved, and with a comprehensive schema that accounts for all the data. "a": true was cast as a string to match the schema of "a": "hello".

The main downside of using spark.read.json() is that Spark will scan through all your data to derive the schema. Depending on how much data you have, that overhead could be significant. If you know that all your JSON data has a consistent schema, it's fine to go ahead and just use schema_of_json() against a single element. If you have schema variability but don't want to scan through all your data, you can set samplingRatio to something less than 1.0 in your call to spark.read.json() to look at a subset of the data.

Here are the docs for spark.read.json(): Scala API / Python API

Pandemic answered 4/3, 2020 at 17:13 Comment(2)
Thank you so much! This is such an elegant solution and should be the accepted solution to this question.Cakewalk
This is such a comprehensive solution! Very helpful!Auraaural
C
4

The from_json function is exactly what you're looking for. Your code will look something like:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

//You can define whatever struct type that your json states
val schema = StructType(Seq(
  StructField("key", StringType, true), 
  StructField("value", DoubleType, true)
))

df.withColumn("jsonData", from_json(col("jsonData"), schema))
Commutator answered 2/9, 2017 at 6:55 Comment(0)
P
1

underlying JSON String is

"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";

Below is the script to filter the JSON and load the required data in to Cassandra.

  sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
            .write.format("org.apache.spark.sql.cassandra")
            .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
            .mode(SaveMode.Append)
            .save()
Procaine answered 26/7, 2016 at 18:6 Comment(0)
C
1

I use the following

(available since 2.2.0, and i am assuming that your json string column is at column index 0)

def parse(df: DataFrame, spark: SparkSession): DataFrame = {
    val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
    spark.read.json(stringDf)
}

It will automatically infer the schema in your JSON. Documented here: https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

Caenogenesis answered 9/5, 2019 at 15:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.