Spark Row to JSON
Asked Answered
M

4

20

I would like to create a JSON from a Spark v.1.6 (using scala) dataframe. I know that there is the simple solution of doing df.toJSON.

However, my problem looks a bit different. Consider for instance a dataframe with the following columns:

|  A  |     B     |  C1  |  C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

I would like to have at the end a dataframe with

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |

where C is a JSON containing C1, C2, C3. Unfortunately, I at compile time I do not know what the dataframe looks like (except the columns A and B that are always "fixed").

As for the reason why I need this: I am using Protobuf for sending around the results. Unfortunately, my dataframe sometimes has more columns than expected and I would still send those via Protobuf, but I do not want to specify all columns in the definition.

How can I achieve this?

Marianmariana answered 22/3, 2016 at 14:56 Comment(0)
G
28

Spark 2.1 should have native support for this use case (see #15354).

import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Gooding answered 1/11, 2016 at 21:57 Comment(3)
Is struct is StructType in Java, Can you give me java implementationAha
While using the above solution I am getting json result as below with extra character ** \"SIGNAL\":[{\"TIME\":1569382072016,\"VALUE\":-9}],\"SIGNAL01\":[{\"TIME\":1569382099654,\"VALUE\":8.0}]}"} How to remove that extra character from result? @Michael ArmbrustCoastguardsman
Does to_json preserve order?Veii
S
13

I use this command to solve the to_json problem:

output_df = (df.select(to_json(struct(col("*"))).alias("content")))
Syphilology answered 22/5, 2019 at 5:29 Comment(0)
D
7

Here, no JSON parser, and it adapts to your schema:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
        col(c),
        lit(if(t=="StringType") "\""; else "") 
      )
    }):_*), 
    lit("}")
  ) as "C"
).collect()
Duodiode answered 23/3, 2016 at 0:49 Comment(1)
Yup and yup. JSON is but hacky in general if you ask me.Duodiode
S
5

First lets convert C's to a struct:

val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))

This is structure can be converted to JSONL using toJSON as before:

dfStruct.toJSON.collect
// Array[String] = Array(
//   {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
//   {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})

I am not aware of any built-in method that can convert a single column but you can either convert it individually and join or use your favorite JSON parser in an UDF.

case class C(C1: String, C2: Int, C3: Boolean)

object CJsonizer {
  import org.json4s._
  import org.json4s.JsonDSL._
  import org.json4s.jackson.Serialization
  import org.json4s.jackson.Serialization.write

  implicit val formats = Serialization.formats(org.json4s.NoTypeHints)

  def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}


val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
  CJsonizer.toJSON(c1, c2, c3))

df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
Silica answered 22/3, 2016 at 14:56 Comment(6)
Actually, my question is really about the second part of how to convert the individual columns to JSON. You are mentioning join-ing the columns, but that does not really work as I have on the one hand a RDD[String] and on the other hand a DataFrameMarianmariana
Like he says, just use a UDF. You don't even have to use a full-blown JSON parser in the UDF -- you can just craft a JSON string on the fly using map and mkString. You will probably need to use DataFrame.columns or possibly DataFrame.dtypes to both craft the select statement and as the basis of the map in the UDF.Duodiode
I agree with @DavidGriffin - udf can be the simplest solution here. And Jackson and json4s are already dragged with other dependencies.Silica
My problem with all of the JSON parsers I have seen is that you need to know in advance what the schema looks like -- like with your solution @Silica -- it only works for those specific columns. What if the names were different? What if there were more than 3 columns?Duodiode
The only problem I see is that Row is extremely ugly data structure. Otherwise you can simply build an arbitrary complex AST with Lift / json4s and convert it to JSON. But truth be told it is to much effort to put it into a SO answer.Silica
Row is ugly for the same reason I hate dealing with JSON in Scala -- it's a clash of cultures Loosey, goosey vs strong, static typing. SQL is loosey goosey -- you are a select away from defining a new type -- hence Row is messy. Avro's GenericRecord has the same problem.Duodiode

© 2022 - 2024 — McMap. All rights reserved.