Trying to turn a blob into multiple columns in Spark
Asked Answered
K

1

2

I have a serialized blob and a function that converts it into a java Map. I have registered the function as a UDF and tried to use it in Spark SQL as follows:

sqlCtx.udf.register("blobToMap", Utils.blobToMap)
val df = sqlCtx.sql(""" SELECT mp['c1'] as c1, mp['c2'] as c2 FROM
                        (SELECT *, blobToMap(payload) AS mp FROM t1) a """)

I do succeed in doing it, but for some reason the very heavy blobToMap function runs twice for every row, and in reality I extract 20 fields and it runs 20 times for every row. I saw the suggestions in Derive multiple columns from a single column in a Spark DataFrame but they are really not scalable - I don't want to create a class for every time I need to extract data.

How can I force Spark to do what's reasonable? I tried to separate to two stages. The only thing that worked was to cache the inner select - but that's not feasible either because it is really a big blob and I need only a few dozen fields from it.

Kata answered 4/1, 2016 at 9:1 Comment(0)
K
1

I'll answer myself hoping it will help anyone.. so after dozens of experiments I was able to force spark to evaluate the udf and turn it into a Map once, instead of recalculating it over and over again for every key request, by splitting the query and doing an evil ugly trick - turning it ti RDD and back to DataFrame:

val df1 = sqlCtx.sql("SELECT *, blobToMap(payload) AS mp FROM t1")
sqlCtx.createDataFrame(df.rdd, df.schema).registerTempTable("t1_with_mp")
val final_df = sqlCtx.sql("SELECT mp['c1'] as c1, mp['c2'] as c2 FROM t1_with_mp")
Kata answered 6/1, 2016 at 15:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.