Spark SQL Row_number() PartitionBy Sort Desc
Asked Answered
W

6

65

I've successfully create a row_number() and partitionBy() by in Spark using Window, but would like to sort this by descending, instead of the default ascending. Here is my working code:

from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window

(
    data_cooccur
    .select(
        "driver",
        "also_item",
        "unit_count",
        F.rowNumber().over(
            Window
            .partitionBy("driver")
            .orderBy("unit_count")
        ).alias("rowNum")
    )
    .show()
)

That gives me this result:

+------+---------+----------+------+
|driver|also_item|unit_count|rowNum|
+------+---------+----------+------+
|   s10|      s11|         1|     1|
|   s10|      s13|         1|     2|
|   s10|      s17|         1|     3|
+------+---------+----------+------+

And here I add the desc() to order descending:

(
    data_cooccur
    .select(
        "driver",
        "also_item",
        "unit_count",
        F.rowNumber().over(
            Window
            .partitionBy("driver")
            .orderBy("unit_count")
            .desc()
        ).alias("rowNum")
    )
    .show()
)

And get this error:

> AttributeError: 'WindowSpec' object has no attribute 'desc'

What am I doing wrong here?

Wynellwynn answered 6/2, 2016 at 22:17 Comment(1)
On my PySpark (2.2.0) I have to use row_number instead of rowNumber.Philipphilipa
W
127

desc should be applied on a column not a window definition. You can use either a method on a column:

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

F.row_number().over(
    Window.partitionBy("driver").orderBy(col("unit_count").desc())
)

or a standalone function:

from pyspark.sql.functions import desc
from pyspark.sql.window import Window

F.row_number().over(
    Window.partitionBy("driver").orderBy(desc("unit_count"))
)
Watercress answered 7/2, 2016 at 3:35 Comment(1)
strange the pyspark orderBy is different from the window.orderBy as one accepts an ascending and the other doesn't.Airport
T
4

Or you can use the SQL code in Spark-SQL:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('Test')\
    .getOrCreate()

spark.sql("""
    select driver
        ,also_item
        ,unit_count
        ,ROW_NUMBER() OVER (PARTITION BY driver ORDER BY unit_count DESC) AS rowNum
    from data_cooccur
""").show()
Taxis answered 15/5, 2019 at 18:31 Comment(0)
F
1
import pyspark
from pyspark.sql.functions import desc
df.orderBy(desc("col"))
Farrah answered 14/11, 2022 at 12:3 Comment(0)
B
0

Update Actually, I tried looking more into this, and it appears to not work. (in fact it throws an error). The reason why it didn't work is that I had this code under a call to display() in Databricks (code after the display() call is never run). It seems like the orderBy() on a dataframe and the orderBy() on a window are not actually the same. I will keep this answer up just for negative confirmation

As of PySpark 2.4,(and probably earlier), simply adding in the keyword ascending=False into the orderBy call works for me.

Ex.

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))

and

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))

seem to give me the same behaviour.

Baty answered 2/8, 2019 at 14:0 Comment(0)
C
0
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

my_new_df = df.select(df["STREET NAME"]).distinct()

# Count the rows in my_new_df 
print("\nThere are %d rows in the my_new_df DataFrame.\n" % my_new_df .count())

# Add a ROW_ID
my_new_df = my_new_df .withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set and sort DESC
my_new_df .orderBy(my_new_df .ROW_ID.desc()).show(10)
Craner answered 23/6, 2022 at 20:35 Comment(0)
E
0

Another option, similar to @zero333's col option is using sorting on the column.

data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy(data_cooccur["unit_count"].desc())).alias("rowNum")).show()
Exaltation answered 13/10, 2022 at 17:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.