How to convert RDD[GenericRecord] to dataframe in scala?
Asked Answered
B

4

2

I get tweets from kafka topic with Avro (serializer and deserializer). Then i create a spark consumer which extracts tweets in Dstream of RDD[GenericRecord]. Now i want to convert each rdd to a dataframe to analyse these tweets via SQL. Any solution to convert RDD[GenericRecord] to dataframe please ?

Balmy answered 13/11, 2017 at 12:46 Comment(1)
can you update with some sample data of RDD[GenericRecord] by doing foreach(println)?Tetzel
M
10

I spent some time trying to make this work (specially how deserialize the data properly but it looks like you already cover this) ... UPDATED

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

As you see, I am using a SchemaConverter to get the dataframe structure from the schema that you used to deserialize (this could be more painful with schema registry). For this you need the following dependency

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

you will need to change your spark version depending on yours.

UPDATE: the code above only works for flat avro schemas.

For nested structures I used something different. You can copy the class SchemaConverters, it has to be inside of com.databricks.spark.avro (it uses some protected classes from the databricks package) or you can try to use the spark-bigquery dependency. The class will not be accessible by default, so you will need to create a class inside a package com.databricks.spark.avro to access the factory method.

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

After that you should be able to convert the data like

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)
Mudcat answered 13/11, 2017 at 14:48 Comment(2)
the method createDataFrame need as arguments RDD[ROW] and structType but i have in my case createDataFramei have RDD[GenericRecord]Balmy
yw! just make sure that you review my last change.. I was missing pass the values to the objectArray object before create the rawMudcat
I
0

A combination of https://mcmap.net/q/901330/-how-to-convert-nested-avro-genericrecord-to-row and https://mcmap.net/q/911005/-how-to-convert-rdd-genericrecord-to-dataframe-in-scala works for me.

I used the following to create MySchemaConversions

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

And then I used

val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)

// unionedResultRdd is unionRDD[GenericRecord]

var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
 val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])

The advantage of having myConverter in the object MyObject is that you will not encounter serialization issues (java.io.NotSerializableException).

object MyObject{
    def myConverter(record: GenericRecord,
        myAvroRecordConverter: (GenericRecord) => Row): Row =
            myAvroRecordConverter.apply(record)
}
Isaac answered 19/10, 2018 at 2:34 Comment(1)
how we will do it spark 3.0.1 versionDickens
L
-1

Even though something like this may help you,

val stream = ...

val dfStream = stream.transform(rdd:RDD[GenericRecord]=>{
     val df = rdd.map(_.toSeq)
              .map(seq=> Row.fromSeq(seq))
              .toDF(col1,col2, ....)

     df
})

I'd like to suggest you an alternate approach. With Spark 2.x you can skip the whole process of creating DStreams. Instead, you can do something like this with structured streaming,

val df = ss.readStream
  .format("com.databricks.spark.avro")
  .load("/path/to/files")

This will give you a single dataframe which you can directly query. Here, ss is the instance of spark session. /path/to/files is the place where all your avro files are being dumped from kafka.

PS: You may need to import spark-avro

libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"

Hope this helped. Cheers

Lailaibach answered 13/11, 2017 at 19:27 Comment(1)
I do not have files , i want to convert RDD[GenericRecord] to dataframeBalmy
A
-3

You can use createDataFrame(rowRDD: RDD[Row], schema: StructType), which is available in the SQLContext object. Example for converting an RDD of an old DataFrame:

import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Note that there is no need to explicitly set any schema column. We reuse the old DF's schema, which is of StructType class and can be easily extended. However, this approach sometimes is not possible, and in some cases can be less efficient than the first one.

Amersham answered 13/11, 2017 at 14:17 Comment(2)
I don't have an old dataframe. I have just RDD[GenericRecord]Balmy
You create your schema .// Example : val innerSchema = StructType( Array( StructField("value", StringType), StructField("count", LongType) ) )Amersham

© 2022 - 2024 — McMap. All rights reserved.