Creating a row number of each row in PySpark DataFrame using row_number() function with Spark version 2.2
Asked Answered
S

3

25

I am having a PySpark DataFrame -

valuesCol = [('Sweden',31),('Norway',62),('Iceland',13),('Finland',24),('Denmark',52)]
df = sqlContext.createDataFrame(valuesCol,['name','id'])
+-------+---+
|   name| id|
+-------+---+
| Sweden| 31|
| Norway| 62|
|Iceland| 13|
|Finland| 24|
|Denmark| 52|
+-------+---+

I wish to add a row column to this DataFrame, which is the row number (serial number) of the row, like shown below -

My final output should be:

+-------+---+--------+
|   name| id|row_num |
+-------+---+--------+
| Sweden| 31|       1|
| Norway| 62|       2|
|Iceland| 13|       3|
|Finland| 24|       4|
|Denmark| 52|       5|
+-------+---+--------+

My Spark version is 2.2

I am trying this code, but it doesn't work -

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().orderBy()
df = df.withColumn("row_num", row_number().over(w))
df.show()

I am getting an Error:

AnalysisException: 'Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'

If I understand it correctly, I need to order some column, but I don't want something like this w = Window().orderBy('id') because that will reorder the entire DataFrame.

Can anyone suggest how to achieve the above mentioned output using row_number() function?

Seagraves answered 29/10, 2018 at 9:30 Comment(1)
@cph_sto- you may like this also.#41313988Rossi
S
49

You should define column for order clause. If you don't need to order values then write a dummy value. Try below;

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))
String answered 29/10, 2018 at 9:33 Comment(10)
Thank you Sir. It works perfectly. Just a small question - I was missing ´lit('A')´. Can you kindly explain what is this part of the code doing? What is 'A' here, as it doesn't appear in the final output anyway. I will accept it as an answer anyway because that yields the output expected.Seagraves
It is a dummy value. It means nothing you can write anything instead of AString
Understood, thanks :) Just one last question - I have seen that row_number() is used along with partitionBy() many a times, so if I load data from HDFS and add a column of row numbers, like above, will there be a reshuffle on the partitions? I know that Spark will only trigger an execution when an action is called and the Catalyst will rearrange operations to yield an optimal solution. My Query: I think there will be no repartitioning of the data by using row_numbers() after we load data from HDFS (and before we invoke any action), but just wanted to seek your perspective!Seagraves
I think it will work. if you don't need to group data and obtain row numbers for each group, no need to use partitionBy clause.String
Perfect solution..;Glyptodont
I guest this is still sorting and have a performance impact right?Creatine
It definitely has impact since its sorting. I added this and i can notice a delay (1Tb data)Benedick
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. I get the above warning Is there any better way to implement this without getting the warning if do not need group or partition. I just want row number.Diplodocus
@Diplodocus I get the same warning. Were you able to find a resolution around this?Worldling
I was getting the same warning then I changed my code to this: w = Window.partitionBy('seller_id').orderBy('seller_id') and used this way df.withColumn('row_number', row_number().over(w)).withColumn('total_earned', sum(col('price')).over(w)).where(col('row_number') == 1).select('seller_name', 'total_earned').show()Foraminifer
F
5

I had a similar problem, but in my case @Ali Yesilli's solution failed, because I was reading multiple input files separately and ultimately unioning them all in a single dataframe. In this case, the order within the window ordered by a dummy variable proved to be unpredictable.

So to achieve more robust ordering, I used monotonically_increasing_id:

df = df.withColumn('original_order', monotonically_increasing_id())
df = df.withColumn('row_num', row_number().over(Window.orderBy('original_order')))
df = df.drop('original_order')
Frederiksberg answered 9/5, 2022 at 15:10 Comment(2)
Wouldn't this approach cause: WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.?Fromm
Yes it will give that warning, but I don't think there's a way to avoid that if you want to do this. It makes sense to me that in order to robustly order things, you need to process them in the same partition.Frederiksberg
A
0
my_data_df.createOrReplaceTempView("my_data")
my_data_indexed_df = spark.sql("select row_number() over (order by (select null)) as row_num,* from my_data")
Angelus answered 13/11, 2023 at 17:45 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.