Spark Dataframe Random UUID changes after every transformation/action
Asked Answered
B

4

15

I have a Spark dataframe with a column that includes a generated UUID. However, each time I do an action or transformation on the dataframe, it changes the UUID at each stage.

How do I generate the UUID only once and have the UUID remain static thereafter.

Some sample code to re-produce my issue is below:

def process(spark: SparkSession): Unit = {

  import spark.implicits._

  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  sc.setLogLevel("OFF")

  // create dataframe
  val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
  df.createOrReplaceTempView("df")
  df.show(false)

  // register an UDF that creates a random UUID
  val generateUUID = udf(() => UUID.randomUUID().toString)

  // generate UUID for new column
  val dfWithUuid = df.withColumn("new_uuid", generateUUID())
  dfWithUuid.show(false)
  dfWithUuid.show(false)    // uuid is different

  // new transformations also change the uuid
  val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
  dfWithUuidWithNewCol.show(false)
}

The output is:

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b   |2   |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c   |3   |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b   |2   |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c   |3   |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b   |2   |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c   |3   |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+

Note that the UUID is different at each step.

Bedight answered 22/3, 2017 at 19:26 Comment(0)
B
6

it is very old question but letting the people know what worked for me. It might help someone.

You could use the expr function as below to generate unique GUIDs which does not change on transformations.

import org.apache.spark.sql.functions._  
// create dataframe  
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")   
df.createOrReplaceTempView("df")   
df.show(false)

// generate UUID for new column   
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)    

// new transformations 
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)

Output is as below :

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+
Beading answered 9/6, 2020 at 10:38 Comment(2)
it throw me exception error: not found: value exprJammiejammin
I think you don't have the import statement at the top of the code. import org.apache.spark.sql.functions._ Beading
C
14

It is an expected behavior. User defined functions have to be deterministic:

The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.

If you want to include non-deterministic function and preserve the output you should write intermediate data to a persistent storage and read it back. Checkpointing or caching may work in some simple cases but it won't be reliable in general.

If upstream process is deterministic (for starters there is shuffle) you could try to use rand function with seed, convert to byte array and pass to UUID.nameUUIDFromBytes.

See also: About how to add a new column to an existing DataFrame with random values in Scala

Note: SPARK-20586 introduced deterministic flag, which can disable certain optimization, but it is not clear how it behaves when data is persisted and a loss of executor occurs.

Carlicarlick answered 22/3, 2017 at 20:1 Comment(3)
Is this problem only with udfs or it can appear if we add column in map function as well?Assoil
@abalcerek, I got this problem in map function (when 2 "collect" were made)Raymonderaymonds
I can confirm the non-deterministic flag does not behave as advertised when applied to a UDF generating a random UUID with UUID.randomUUID(). The UDF may still be evaluated multiple times. This might be related to SPARK-23599Shippee
B
6

it is very old question but letting the people know what worked for me. It might help someone.

You could use the expr function as below to generate unique GUIDs which does not change on transformations.

import org.apache.spark.sql.functions._  
// create dataframe  
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")   
df.createOrReplaceTempView("df")   
df.show(false)

// generate UUID for new column   
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)    

// new transformations 
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)

Output is as below :

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+
Beading answered 9/6, 2020 at 10:38 Comment(2)
it throw me exception error: not found: value exprJammiejammin
I think you don't have the import statement at the top of the code. import org.apache.spark.sql.functions._ Beading
C
2

I have a pyspark version:

from pyspark.sql import functions as f

pdataDF=dataDF.withColumn("uuid_column",f.expr("uuid()"))
display(pdataDF)
pdataDF.write.mode("overwrite").saveAsTable("tempUuidCheck")
Claudette answered 10/12, 2021 at 10:54 Comment(0)
F
-1

Try this one:

df.withColumn("XXXID", lit(java.util.UUID.randomUUID().toString))

it works different vs:

val generateUUID = udf(() => java.util.UUID.randomUUID().toString)
df.withColumn("XXXCID", generateUUID() )

I hope this helps.

Pawel

Fardel answered 18/5, 2020 at 7:9 Comment(1)
lit will make the same uuid for all data.Tripalmitin

© 2022 - 2024 — McMap. All rights reserved.