Impossible to operate on custom type after it is encoded? Spark Dataset
Asked Answered
S

1

4

Say you have this (solution of encoding custom type is brought from this thread):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

When do a ds.show, I got:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

I understand that it's because the contents are encoded into internal Spark SQL binary representation. But how can I display the decoded content like this?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

UPDATE1

Displaying content is not the biggest issue, what's more important is that it could lead to problem when processing the dataset, consider this example:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

Does this mean, kryo-encoded type is not able to do operation like joinWith conveniently?

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(Solution provided by @jacek below is good to know for case class type, but it still cannot decode custom type)

Sammiesammons answered 3/10, 2020 at 12:41 Comment(0)
P
3

The following worked for me, but seems like using high-level API to do low-level (deserialization) work.

This is not to say it should be done this way, but shows that it's possible.

I don't know why KryoDeserializer does not deserialize bytes to the object the bytes came from. It is just this way.

One major difference between your class definition and mine is this case that let me using the following trick. Again, no idea exactly why it makes it possible.

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+
Petrology answered 3/10, 2020 at 19:24 Comment(9)
Thanks for the answer. Does it mean each time when displaying a dataset (of custom type, e.g. defined by case class), or writing to disk, we have to do the ks.newInstance.deserialize first? That quite confuses me about the usage of encoders in Dataset.Sammiesammons
Not sure I understood you correctly, but since you've used kryo things have changed a bit. Stick to case class and import spark.implicits._ and you should be just fine. In other words, why have you considered Encoders.kryo? I rarely see its use.Petrology
For the type that is not within the list of predefined encoders, it seems we have to use kryo, no? If we use it, ds is in binary format rather than tabular, which prevents further transformation and data processing (imagine you want to do a simple joinWith on the column i with another ds, you have to do ks.newInstance.deserialize first, which is really not convenient).Sammiesammons
For unsupported types you have to convert them to the ones that are supported. No need for kryo.Petrology
In fact, when i pasted your code in spark-shell, i got this error (Spark 3.0.1) in the step of .asInstanceOf[MyObj]: java.lang.UnsupportedOperationException: Schema for type MyObj is not supported at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)Sammiesammons
@Sammiesammons You may have missed this // Note that case keyword in my code, didn't you?Petrology
That is true! But then it is not the solution i'm looking for, because i wouldn't encode a case class object using kryo, with import spark.implicits._ i already got the benefits from Spark's default encoders. I'm looking for solution to decode/deserialize custom type encoded by kryoSammiesammons
I cannot imagine how i can work on the encoded custom objects, if i don't deserialize it. Or say, what's the point of using kryo if i cannot decode back?Sammiesammons
Thank you @Jacek, your comment about converting custom type into case class inspires me!Sammiesammons

© 2022 - 2024 — McMap. All rights reserved.