How to map rows to protobuf-generated class?
Asked Answered
W

5

6

I need to write a job that reads a DataSet[Row] and converts it to a DataSet[CustomClass] where CustomClass is a protobuf class.

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)

However, looks like Protobuf classes are not really Java Beans and I do get a NPE on the following

val x =  Encoders.bean(classOf[CustomClass])

How does one go about ensuring that the job can emit a dataset of type DataSet[CustomClass] where CustomClass is the protobuf class. Any pointers/examples on writing a custom encoder for the class?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

The Bean encoder internally uses

JavaTypeInference.serializerFor(protoClass)

If I try to do the same in my custom encoder, I get a more descriptive error message:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
Winner answered 26/6, 2017 at 21:15 Comment(3)
Can you paste the NPE to your question?Decomposer
Added the stacktrace, pretty sure at this point this happens because Protobuf classes are not valid Java beansWinner
@JacekLaskowski : updated both the stack traces (using Encoders.bean as well as using similar code in custom Encoder) - does this help?Winner
D
3

My experience with Encoders are not very promising and at this point I would recommend not spending more time on this.

I'd rather think about alternatives and how to work with Spark its way and map the result of Spark computation to the protobuf-generated class at the very last step.

Decomposer answered 27/6, 2017 at 1:30 Comment(1)
Thanks @JacekLaskowski your guide is very helpful in general. I was hoping to be able to write these in a distributed manner to a key value store using the map operation. My attempts so far look very very similar to what you mention but without luck. Will post an update if I get this to work.Winner
A
2

For converting Row to Protobuf class you can use sparksql-protobuf

This library provides utilities to work with Protobuf objects in SparkSQL. It provides a way to read parquet file written by SparkSQL back as an RDD of the compatible protobuf object. It can also convert RDD of protobuf objects into DataFrame.

add a dependency to your build.sbt file

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"

)

You can follow some examples from the library to get started

Example 1

Example 2

I hope this helps!

Airsick answered 26/6, 2017 at 22:23 Comment(3)
Thanks, I took a look at this, "It provides a way to read parquet file written by SparkSQL back as an RDD of compatible protobuf object" - this assumption is not necessarily true in my case - the underlying representation is not in parquet.Winner
I haven't worked with spark and protobuf, but this should helped youAirsick
Some more context, I tried writing my own encoder. val serializer = JavaTypeInference.serializerFor(protoClass) This is what fails as suspected: Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)Winner
W
0

While not a strict answer, I did get a workaround. The encoders are not needed if we use RDDs.

val rows =
      spark.sql("select * from tablename").as[CaseClass].rdd
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}

This gives me an RDD of the Protobuf Class that I can work with.

Winner answered 27/6, 2017 at 20:18 Comment(1)
Why don't you directly build the RDD[Proto] you want with sparksql-protobuf (saurfang's github)?Allegra
A
0

The way I did it: I used saurfang's sparksql-protobuf library (code available on Github). You directly get a RDD[ProtoSchema], but its difficult to convert to a Dataset[ProtoSchema]. I used it to fetch information to append to another RDD with user-defined functions mainly.

1: Import the library

With Maven:

<dependencies>
    <dependency>
        <groupId>com.github.saurfang</groupId>
        <artifactId>sparksql-protobuf_2.10</artifactId>
        <version>0.1.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.5.1</version>
    </dependency>
</dependencies>
...

<repositories>
    <repository>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <id>bintray-saurfang-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/saurfang/maven</url>
    </repository>
</repositories>

2: Read data as a RDD[ProtoSchema]

val sess: SparkSession = ...
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema])

(Optional) Add a PathFilter (Hadoop API)

If you'd like to add a PathFilter class (like you used to with Hadoop), or activate other options you had with Hadoop, you can do:

sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true)
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter])

But do not forget to clear your Hadoop configuration, in case you want to use your SparkSession to read other things:

sess.sparkContext.hadoopConfiguration.clear()
Allegra answered 23/2, 2018 at 8:41 Comment(0)
L
0

The default serialization doesn't work for my protobuf objects either.

However, turns out spark internally is using kryo. So if you do

Encoders.kryo(ProtoBuffObject.class)

it worked.

Lilas answered 4/12, 2020 at 8:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.