How to convert nested avro GenericRecord to Row
Asked Answered
R

1

5

I have a code to convert my avro record to Row using function avroToRowConverter()

directKafkaStream.foreachRDD(rdd -> {
        JavaRDD<Row> newRDD= rdd.map(x->{

            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
            return avroToRowConverter(recordInjection.invert(x._2).get());
            });

This function is not working for nested schema (TYPE= UNION).

private static Row avroToRowConverter(GenericRecord avroRecord) {
    if (null == avroRecord) {
        return null;
    }
    //GenericData
    Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
    StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
    for (Schema.Field field : avroRecord.getSchema().getFields()) {

        if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
            objectArray[field.pos()] = ""+avroRecord.get(field.pos());
        }else {
            objectArray[field.pos()] = avroRecord.get(field.pos());
        }
    }

    return new GenericRowWithSchema(objectArray, structType);
}

Can anyone suggest how can I convert complex schema to ROW?

Rehearse answered 16/2, 2018 at 13:40 Comment(1)
check out my answer on converting RDD[GenericRecord] to DF : https://mcmap.net/q/911005/-how-to-convert-rdd-genericrecord-to-dataframe-in-scalaCalla
F
10

There is SchemaConverters.createConverterToSQL but it is private unfortunately. There are PRs to make it public, but they were never merged:

There's a workaround though that we used.

You can expose it by creating a class in com.databricks.spark.avro package:

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

Then you can use it in your code like this:

final DataType myAvroType = SchemaConverters.toSqlType(MyAvroRecord.getClassSchema()).dataType();

final Function1<GenericRecord, Row> myAvroRecordConverter =
        MySchemaConversions.createConverterToSQL(MyAvroRecord.getClassSchema(), myAvroType);

Row[] convertAvroRecordsToRows(List<GenericRecord> records) {
    return records.stream().map(myAvroRecordConverter::apply).toArray(Row[]::new);
}

For one record you can just call it like this:

final Row row = myAvroRecordConverter.apply(record);
Fallfish answered 16/2, 2018 at 13:53 Comment(8)
I was able to compile till the line before return, but since I am new to scala, I am unable to understand the return statement. What should I do for returning the org.apache.spark.sql.Row for this functionRehearse
Replace ... with your list of records. I put it there as a placeholderFallfish
For one Avro record just call myAvroRecordConverter.apply(avroRecord)Fallfish
myAvroRecordConverter.apply(avroRecord) is the one I was looking for, Thanks a lot!Rehearse
Please upvote and accept the answer if it solves your problem.Fallfish
I've accepted the answer, and sorry I can't upvote since I have less than 15 reputations currently.Rehearse
The resulted row does not have schema associated with it, is there a way to retain the schema too?Stantonstanway
@Stantonstanway maybe this can help: #33935115Fallfish

© 2022 - 2024 — McMap. All rights reserved.