dataframe look up and optimization
Asked Answered
F

2

1

I am using spark-sql-2.4.3v with java. I have scenario below

val data = List(
  ("20", "score", "school",  14 ,12),
  ("21", "score", "school",  13 , 13),
  ("22", "rate", "school",  11 ,14),
  ("23", "score", "school",  11 ,14),
  ("24", "rate", "school",  12 ,12),
  ("25", "score", "school", 11 ,14)
 )
val df = data.toDF("id", "code", "entity", "value1","value2")
df.show

//this look up data populated from DB.

val ll = List(
   ("aaaa", 11),
  ("aaa", 12),
  ("aa", 13),
  ("a", 14)
 )
val codeValudeDf = ll.toDF( "code", "value")
codeValudeDf.show

I need to map "code" with "value" in the final output, only for those rows/records which has "code" as "score" in the "data" dataframe.

How can i make a look up hashmap from codeValudeDf , so that I can get output as below

+---+-----+-------+------+-----+
| id| code|entity|value1|value2|
+---+-----+-------+------+-----+
| 20|score|school|     a|   aaa|
| 21|score|school|    aa|    aa|
| 22| rate|school|    11|    14|
| 23|score|school|  aaaa|     a|
| 24| rate|school|    12|    12|
| 25|score|school|  aaaa|     a|
+---+-----+------+------+------+

Is there any possibility to make this look up optimum i.e. every time i should not pull the dataframe data from DB ??

Frond answered 16/7, 2020 at 10:54 Comment(1)
Does this answer your question? Lookup in Spark dataframesHalve
E
2

If lookup data is of small size then you can create Map and broadcast it. broadcasted map can be easily used in udf as below-

Load the test data provided

 val data = List(
      ("20", "score", "school",  14 ,12),
      ("21", "score", "school",  13 , 13),
      ("22", "rate", "school",  11 ,14),
      ("23", "score", "school",  11 ,14),
      ("24", "rate", "school",  12 ,12),
      ("25", "score", "school", 11 ,14)
    )
    val df = data.toDF("id", "code", "entity", "value1","value2")
    df.show
    /**
      * +---+-----+------+------+------+
      * | id| code|entity|value1|value2|
      * +---+-----+------+------+------+
      * | 20|score|school|    14|    12|
      * | 21|score|school|    13|    13|
      * | 22| rate|school|    11|    14|
      * | 23|score|school|    11|    14|
      * | 24| rate|school|    12|    12|
      * | 25|score|school|    11|    14|
      * +---+-----+------+------+------+
      */

    //this look up data populated from DB.

    val ll = List(
      ("aaaa", 11),
      ("aaa", 12),
      ("aa", 13),
      ("a", 14)
    )
    val codeValudeDf = ll.toDF( "code", "value")
    codeValudeDf.show
    /**
      * +----+-----+
      * |code|value|
      * +----+-----+
      * |aaaa|   11|
      * | aaa|   12|
      * |  aa|   13|
      * |   a|   14|
      * +----+-----+
      */

broadcasted map can be easily used in udf as below-


    val lookUp = spark.sparkContext
      .broadcast(codeValudeDf.map{case Row(code: String, value: Integer) => value -> code}
      .collect().toMap)

    val look_up = udf((value: Integer) => lookUp.value.get(value))

    df.withColumn("value1",
      when($"code" === "score", look_up($"value1")).otherwise($"value1".cast("string")))
      .withColumn("value2",
        when($"code" === "score", look_up($"value2")).otherwise($"value2".cast("string")))
      .show(false)
    /**
      * +---+-----+------+------+------+
      * |id |code |entity|value1|value2|
      * +---+-----+------+------+------+
      * |20 |score|school|a     |aaa   |
      * |21 |score|school|aa    |aa    |
      * |22 |rate |school|11    |14    |
      * |23 |score|school|aaaa  |a     |
      * |24 |rate |school|12    |12    |
      * |25 |score|school|aaaa  |a     |
      * +---+-----+------+------+------+
      */


Eskil answered 16/7, 2020 at 11:28 Comment(9)
could you please tell what this exactly doing here .map{case Row(code: String, value: Integer) => value -> code}Frond
map on Dataset[Row] to convert it into Dataset[(Integer, String)] and then collect to Array[(Integer, String)] then toMap to Map[nteger, String]Eskil
thank you ...when I tried the solution it is giving error org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReaderFrond
Try making the class serializable in which you have spark.broadcastEskil
I am not using any class now , i am executing it in my Zepplin notebook and encountered this errorFrond
I don't have zepplin env to support you. can you try using test case?Eskil
any suggestion on this #63075069Frond
I've already answered similar query here- https://mcmap.net/q/540946/-issue-in-union-with-empty-dataframeEskil
hi ,i have a use case like this , any advice please #63137937Frond
F
-2

Using the broadcasted map indeed looks a wise decision as you do not need to hit your database to pull the lookup data every time.

Here I have solved the problem using a key-value map in a UDF. I am unable to compare its performance w.r.t. broadcasted map approach, but would welcome inputs from spark experts to opine.

Step# 1: Building KeyValueMap -

val data = List(
  ("20", "score", "school",  14 ,12),
  ("21", "score", "school",  13 , 13),
  ("22", "rate", "school",  11 ,14),
  ("23", "score", "school",  11 ,14),
  ("24", "rate", "school",  12 ,12),
  ("25", "score", "school", 11 ,14)
 )
val df = data.toDF("id", "code", "entity", "value1","value2")

val ll = List(
   ("aaaa", 11),
  ("aaa", 12),
  ("aa", 13),
  ("a", 14)
 )
val codeValudeDf = ll.toDF( "code", "value")


val Keys = codeValudeDf.select("value").collect().map(_(0).toString).toList

val Values = codeValudeDf.select("code").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap

Step# 2: Creating UDF

def CodeToValue(code: String, key: String): String = { 
if (key == null) return ""
if (code != "score") return key
val result: String = KeyValueMap.getOrElse(key,"not found!") 
return result }

val CodeToValueUDF = udf (CodeToValue(_:String, _:String):String )

Step# 3: Adding derived columns using UDF in original dataframe

val newdf  = df.withColumn("Col1", CodeToValueUDF(col("code"), col("value1")))

val finaldf = newdf.withColumn("Col2", CodeToValueUDF(col("code"), col("value2")))
    
finaldf.show(false)

+---+-----+------+------+------+----+----+
| id| code|entity|value1|value2|Col1|Col2|
+---+-----+------+------+------+----+----+
| 20|score|school|    14|    12|   a| aaa|
| 21|score|school|    13|    13|  aa|  aa|
| 22| rate|school|    11|    14|  11|  14|
| 23|score|school|    11|    14|aaaa|   a|
| 24| rate|school|    12|    12|  12|  12|
| 25|score|school|    11|    14|aaaa|   a|
+---+-----+------+------+------+----+----+
Furry answered 16/7, 2020 at 22:40 Comment(3)
You shouldn't use direct variables inside udf though it can be possible. Spark needs to pass copy of the local variable to each task which is inefficient for the big data where there are plenty of tasks getting executed either on the one executor or another. To make it more performant you should always broadcast it- more info spark.apache.org/docs/2.2.0/…Eskil
Thank you @Someshwar for the insights. I have recently started learning spark and still in learning phase. I agree with you.Furry
@Shantanu Kher can you tell me what is wrong with this broadcast variable accessing ? #64004197Frond

© 2022 - 2024 — McMap. All rights reserved.