pyspark - get consistent random value across Spark sessions
Asked Answered
A

3

3

I want to add a column of random values to a dataframe (has an id for each row) for something I am testing. I am struggling to get reproducible results across Spark sessions - same random value against each row id. I am able to reproduce the results by using

from pyspark.sql.functions import rand

new_df = my_df.withColumn("rand_index", rand(seed = 7))

but it only works when I am running it in same Spark session. I am not getting same results once I relaunch Spark and run my script.

I also tried defining a udf, testing to see if i can generate random values (integers) within an interval and using random from Python with random.seed set

import random
random.seed(7)
spark.udf.register("getRandVals", lambda x, y: random.randint(x, y), LongType())

but to no avail.

Is there a way to ensure reproducible random number generation across Spark sessions such that a row id gets same random value? I would really appreciate some guidance :) Thanks for the help!

Andryc answered 27/11, 2019 at 20:21 Comment(11)
pls show code...Tortricid
I am wondering if not obvious oversight on how it all works.Tortricid
i already shared the pyspark code snippet showing how i set the seed for rand() and try generating column of random values, what specifically are you looking for?Andryc
added another code snippet to my post to show udf i defined using python's random module, lemme know please anything else i can share. Also I'd appreciate if the downvotes are explained so i dont repeat my mistake :)Andryc
are you running same input?Tortricid
I just ran this on a Databricks cluster, re-started the cluster and ran again and got same results... You may have different data and partitioning and get different data therefore...?Tortricid
yes its got same input each time - i updated the question with the detail that in my case, the dataframe has an id for each row and goal is to get same random value for a row across Spark sessions. Apologies that initial post didn't specify that, i updated itAndryc
If I understood your question correctly then you may want to sort your data on id column (assumption - I hope id column is unique in nature) and then insert random value column. Something like - df.orderBy(df.id.desc()).withColumn("rand_index", rand(seed=7))?Anomalous
@Anomalous yes the id is unique. So you're suggesting to sort data on id to control data distribution and thereby ensure a row/id gets same random value assigned each time - do i get it right?Andryc
with the dataframe having say a 100M rows, performance/runtime would take a hit coz of sorting right? but guess its the necessary cost to pay to ensure reproducibility or is there another way?Andryc
can't think of any better way to achieve it...Anomalous
T
1

I suspect that you are getting the same common values for the seed, but in different order based on your partitioning which is influenced by the data distribution when reading from disk and there could be more or less data per time. But I am not privy to your code in reality.

The rand function generates the same random data (what is the point of the seed otherwise) and somehow the partitions get a slice of it. If you look you should guess the pattern!

Here is an an example of 2 different cardinality dataframes. You can see that the seed gives the same or a superset of results. So, ordering and partitioning play a role imo.

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col
df1 = spark.range(1, 5).select(col("id").cast("double"))
df1 = df1.withColumn("rand_index", rand(seed = 7))                                   
df1.show()

df1.rdd.getNumPartitions()
print('Partitioning distribution: '+ str(df1.rdd.glom().map(len).collect()))

returns:

+---+-------------------+
| id|         rand_index|
+---+-------------------+
|1.0|0.06498948189958098|
|2.0|0.41371264720975787|
|3.0|0.12030715258495939|
|4.0| 0.2731073068483362|
+---+-------------------+

8 partitions & Partitioning distribution: [0, 1, 0, 1, 0, 1, 0, 1]

The same again with more data:

...
df1 = spark.range(1, 10).select(col("id").cast("double"))
...

returns:

+---+-------------------+
| id|         rand_index|
+---+-------------------+
|1.0| 0.9147159860432812|
|2.0|0.06498948189958098|
|3.0| 0.7069655052310547|
|4.0|0.41371264720975787|
|5.0| 0.1982919638208397|
|6.0|0.12030715258495939|
|7.0|0.44292918521277047|
|8.0| 0.2731073068483362|
|9.0| 0.7784518091224375|
+---+-------------------+

8 partitions & Partitioning distribution: [1, 1, 1, 1, 1, 1, 1, 2]

You can see 4 common random values - within a Spark session or out of session.

Tortricid answered 30/11, 2019 at 19:25 Comment(3)
Thank you so much for answering!! that makes sense. In my specific use-case, i have ids for each row where i append the random values. As your example shows and thats what my test also revealed, an id is not getting reproducible random value assigned to it. You explained the cause as having to do with "partitioning" scheme and it was my hunch too.Andryc
Now the question stands - is there a way to ensure an id gets same random value (how to tweak partitioning for that) across spark sessions? is there a different way to approach the problem which I am missing? Guess i should've specified my initial SO post/question clearly now :( sorry for confusionAndryc
I think I answered the question. There is no way to do that as you cannot control the order of the data when cardinality differs. What you see is that the allocation is at least one rand entry per partition, even if partition empty, in which case it is skipped and not used.Tortricid
J
1

I know it's a bit late, but have you considered using hashing of IDs, dates etc. that are deterministic, instead of using built-in random functions? I'm encountering similar issue but I believe my problem can be solved using for example xxhash64, which is a PySpark built-in hash function. You can then use the last few digits, or normalize if you know the total range of the hash values, which I couldn't find in its documentations.

Jink answered 27/1, 2022 at 20:37 Comment(0)
M
0

Found a nice article, though written in scala, it might help here: Link

Merridie answered 25/1 at 16:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.