How to create UDF from Scala methods (to compute md5)?
Asked Answered
P

2

5

I would like to build one UDF from two already working functions. I'm trying to calculate a md5 hash as a new column to an existing Spark Dataframe.

def md5(s: String): String = { toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))}
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")

Structure (what i have so far)

val md5_hash: // UDF Implementation
val sqlfunc = udf(md5_hash)
val new_df = load_df.withColumn("New_MD5_Column", sqlfunc(col("Duration")))

Unfortunately i don't know how to propably implement the function as UDF.

Poppo answered 29/6, 2017 at 7:37 Comment(0)
P
1

you can use following udf function named as md5

import org.apache.spark.sql.functions._
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))

val new_df = load_df.withColumn("New_MD5_Column", md5(col("Duration")))
Purusha answered 29/6, 2017 at 7:44 Comment(0)
R
11

Why not using the built-in md5 function?

md5(e: Column): Column Calculates the MD5 digest of a binary column and returns the value as a 32 character hex string.

You could then use it as follows:

val new_df = load_df.withColumn("New_MD5_Column", md5($"Duration"))

You have to make sure that the column is of binary type so in case it's int you may see the following error:

org.apache.spark.sql.AnalysisException: cannot resolve 'md5(Duration)' due to data type mismatch: argument 1 requires binary type, however, 'Duration' is of int type.;;

You should then change the type to be md5-compatible, i.e. binary type, using bin function.

bin(e: Column): Column An expression that returns the string representation of the binary value of the given long column. For example, bin("12") returns "1100".

A solution could be as follows then:

val solution = load_df.
  withColumn("bin_duration", bin($"duration")).
  withColumn("md5", md5($"bin_duration"))
scala> solution.show(false)
+--------+------------+--------------------------------+
|Duration|bin_duration|md5                             |
+--------+------------+--------------------------------+
|1       |1           |c4ca4238a0b923820dcc509a6f75849b|
+--------+------------+--------------------------------+

You could also "chain" functions together and do the conversion and calculating MD5 in one withColumn, but I prefer to keep steps separate in case there's an issue to resolve and having intermediate steps usually helps.

Performance

The reason why you would consider using the build-in functions bin and md5 over custom user-defined functions (UDFs) is that you could get a better performance as Spark SQL is in full control and would not add extra steps for serialization to and deserialization from an internal row representation.

enter image description here

It is not the case here, but still requires less to import and work with.

Rigmarole answered 30/6, 2017 at 1:4 Comment(3)
Already tried this but I get the following error: Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'md5(Duration)' due to data type mismatch: argument 1 requires binary type, however, 'Duration' is of int type.;;Poppo
The hashing functions only work on Strings, as far as I know, but the conversion shouldn't be an issue - sadly it's not implicitly handled.Cist
Actually the comments mention that it works on Binary type but I've found it to work with String as wellOnida
P
1

you can use following udf function named as md5

import org.apache.spark.sql.functions._
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))

val new_df = load_df.withColumn("New_MD5_Column", md5(col("Duration")))
Purusha answered 29/6, 2017 at 7:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.