How to convert Row to json in Spark 2 Scala
Asked Answered
A

8

14

Is there a simple way to converting a given Row object to json?

Found this about converting a whole Dataframe to json output: Spark Row to JSON

But I just want to convert a one Row to json. Here is pseudo code for what I am trying to do.

More precisely I am reading json as input in a Dataframe. I am producing a new output that is mainly based on columns, but with one json field for all the info that does not fit into the columns.

My question what is the easiest way to write this function: convertRowToJson()

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom's Solutions:

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

only works if the Row only has one level not with nested Row. This is the schema:

StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

Also tried Artem suggestion, but that did not compile:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}
Allusion answered 11/1, 2017 at 22:28 Comment(0)
A
5

I need to read json input and produce json output. Most fields are handled individually, but a few json sub objects need to just be preserved.

When Spark reads a dataframe it turns a record into a Row. The Row is a json like structure. That can be transformed and written out to json.

But I need to take some sub json structures out to a string to use as a new field.

This can be done like this:

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))

location.address is the path to the sub json object of the incoming json based dataframe. address_json is the column name of that object converted to a string version of the json.

to_json is implemented in Spark 2.1.

If generating it output json using json4s address_json should be parsed to an AST representation otherwise the output json will have the address_json part escaped.

Allusion answered 25/2, 2017 at 12:4 Comment(0)
A
24

You can use getValuesMap to convert the row object to a Map and then convert it JSON:

import scala.util.parsing.json.JSONObject
import org.apache.spark.sql._

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")    
val row = df.first()          // this is an example row object

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

convertRowToJSON(row)
// res46: String = {"A" : 1, "B" : 2, "C" : 3}
Allineallis answered 11/1, 2017 at 22:50 Comment(3)
Correction: It actually only work for the first level of the Map / Struct, not for a nested Map you will only see the values not the keys.Allusion
@SamiBadawi Where you able to find a solution for nested Map?Ambit
I am also having problems for nestingTerrence
A
5

I need to read json input and produce json output. Most fields are handled individually, but a few json sub objects need to just be preserved.

When Spark reads a dataframe it turns a record into a Row. The Row is a json like structure. That can be transformed and written out to json.

But I need to take some sub json structures out to a string to use as a new field.

This can be done like this:

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))

location.address is the path to the sub json object of the incoming json based dataframe. address_json is the column name of that object converted to a string version of the json.

to_json is implemented in Spark 2.1.

If generating it output json using json4s address_json should be parsed to an AST representation otherwise the output json will have the address_json part escaped.

Allusion answered 25/2, 2017 at 12:4 Comment(0)
D
4

Pay attention scala class scala.util.parsing.json.JSONObject is deprecated and not support null values.

@deprecated("This class will be removed.", "2.11.0")

"JSONFormat.defaultFormat doesn't handle null values"

https://issues.scala-lang.org/browse/SI-5092

Dinny answered 16/7, 2017 at 7:4 Comment(1)
Thanks Arnon. There has been some talk about modernizing json support in Scala.Allusion
L
2

JSon has schema but Row doesn't have a schema, so you need to apply schema on Row & convert to JSon. Here is how you can do it.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

def convertRowToJson(row: Row): String = {

  val schema = StructType(
      StructField("name", StringType, true) ::
      StructField("meta", StringType, false) ::  Nil)

      return sqlContext.applySchema(row, schema).toJSON
}
Lanita answered 12/1, 2017 at 4:23 Comment(0)
C
1

Essentially, you can have a dataframe which contains just one row. Thus, you can try to filter your initial dataframe and then parse it to json.

Celtuce answered 11/1, 2017 at 22:47 Comment(1)
Thanks for your suggestion. I tried you approach: def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = { val sparkContext = sqlContext.sparkContext import sparkContext._ import sqlContext.implicits._ import sqlContext._ val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) val dataFrame = rowRDD.toDF() //XXX does not compile dataFrame } It did not compile.Allusion
A
1

I had the same issue, I had parquet files with canonical schema (no arrays), and I only want to get json events. I did as follows, and it seems to work just fine (Spark 2.1):

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import scala.util.parsing.json.JSONFormat.ValueFormatter
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}

def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = {
  schema.fields.map {
    field =>
      try{
        if (field.dataType.typeName.equals("struct")){
          field.name -> getValuesMap(row.getAs[Row](field.name),   field.dataType.asInstanceOf[StructType]) 
        }else{
          field.name -> row.getAs[T](field.name)
        }
      }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}}
  }.filter(xy => xy._2 != null).toMap
}

def convertRowToJSON(row: Row, schema: StructType): JSONObject = {
  val m: Map[String, Any] = getValuesMap(row, schema)
  JSONObject(m)
}
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
val defaultFormatter : ValueFormatter = (x : Any) => x match {
  case s : String => "\"" + JSONFormat.quoteString(s) + "\""
  case jo : JSONObject => jo.toString(defaultFormatter)
  case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
  case ja : JSONArray => ja.toString(defaultFormatter)
  case other => other.toString
}

val someFile = "s3a://bucket/file"
val df: DataFrame = sqlContext.read.load(someFile)
val schema: StructType = df.schema
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema))
Arlenaarlene answered 30/10, 2017 at 16:46 Comment(0)
C
1

if you are iterating through an data frame , you can directly convert the data frame to a new dataframe with json object inside and iterate that

val df_json = df.toJSON

Candicandia answered 18/11, 2019 at 10:13 Comment(2)
Please edit your question otherwise prefer comments instead. In any case, read the rules of the community.Elyn
Is it possible to set the column name? By default it is 'value' and I want to change it to 'body'Chasseur
A
0

I combining the suggestion from: Artem, KiranM and Psidom. Did a lot of trails and error and came up with this solutions that I tested for nested structures:

def row2Json(row: Row, sqlContext: SQLContext): String = {
  import sqlContext.implicits
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
  dataframe.toJSON.first
}

This solution worked, but only while running in driver mode.

Allusion answered 13/1, 2017 at 19:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.