How to convert a struct field in a Row to an avro record in Spark Java
Asked Answered
F

1

7

I have a use case where I want to convert a struct field to an Avro record. The struct field originally maps to an Avro type. The input data is avro files and the struct field corresponds to a field in the input avro records.

Below is what I want to achieve in pseudocode.

DataSet<Row> data = loadInput(); // data is of form (foo, bar, myStruct) from avro data. 

// do some joins to add more data
data = doJoins(data); // now data is of form (a, b, myStruct)

// transform DataSet<Row> to DataSet<MyType> 
DataSet<MyType> myData = data.map(row -> myUDF(row), encoderOfMyType);

// method `myUDF` definition
MyType myUDF(Row row) {
  String a = row.getAs("a");
  String b = row.getAs("b");

  // MyStruct is the generated avro class that corresponds to field myStruct 
  MyStruct myStruct = convertToAvro(row.getAs("myStruct"));

  return generateMyType(a, b, myStruct);
}

My question is: how can I implement the convertToAvro method in above pseudocode?

Flabellum answered 16/9, 2020 at 17:30 Comment(0)
U
3

From the documentation:

The Avro package provides function to_avro to encode a column as binary in Avro format, and from_avro() to decode Avro binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.

The function to_avro acts as replacement for the convertToAvro method:

import static org.apache.spark.sql.avro.functions.*;

//put the avro schema of the struct column into a string
//in my example I assume that the struct consists of a two fields:
//a long field (s1) and a string field (s2)
String schema = "{\"type\":\"record\",\"name\":\"mystruct\"," +
        "\"namespace\":\"topLevelRecord\",\"fields\":[{\"name\":\"s1\"," +
        "\"type\":[\"long\",\"null\"]},{\"name\":\"s2\",\"type\":" +
        "[\"string\",\"null\"]}]},\"null\"]}";

data = ...

//add an additional column containing the struct as binary column
Dataset<Row> data2 = df.withColumn("to_avro", to_avro(data.col("myStruct"), schema));
df2.printSchema();
df2.show(false);

prints

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)

+----+----+----------+----------------------------+
|a   |b   |mystruct  |to_avro                     |
+----+----+----------+----------------------------+
|foo1|bar1|[1, one]  |[00 02 00 06 6F 6E 65]      |
|foo2|bar2|[3, three]|[00 06 00 0A 74 68 72 65 65]|
+----+----+----------+----------------------------+

To convert the avro column back, the function from_avro can be used:

Dataset<Row> data3 = data2.withColumn("from_avro", from_avro(data2.col("to_avro"), schema));
df3.printSchema();
df3.show();

Output:

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)
 |-- from_avro: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)

+----+----+----------+--------------------+----------+
|   a|   b|  mystruct|             to_avro| from_avro|
+----+----+----------+--------------------+----------+
|foo1|bar1|  [1, one]|[00 02 00 06 6F 6...|  [1, one]|
|foo2|bar2|[3, three]|[00 06 00 0A 74 6...|[3, three]|
+----+----+----------+--------------------+----------+

A word about the udf: in the question you performed the transformation to the avro format within the udf. I would prefer to include only the actual business logic in the udf and keep the format transformation outside. This separates the logic and the format transformation. If necessary, you can drop the original column mystruct after creating the avro column.

Unwary answered 22/9, 2020 at 19:34 Comment(1)
I came up with another approach, which was mentioned here: linkedin.com/feed/update/urn:li:activity:6712826689614880768. It was based on Encoders. This avoided temporary conversion to avro binary. However, it was not tested, but it looked promising to me.Flabellum

© 2022 - 2024 — McMap. All rights reserved.