Why do Window functions fail with "Window function X does not take a frame specification"?
Asked Answered
C

2

7

I'm trying to use Spark 1.4 window functions in pyspark 1.4.1

but getting mostly errors or unexpected results. Here is a very simple example that I think should work:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])

wSpec = Window.orderBy(df.a).rowsBetween(-1,1)

df.select(df.a, func.rank().over(wSpec).alias("rank"))  
    ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))  
    ===>  org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;


wSpec = Window.orderBy(df.a)

df.select(df.a, func.rank().over(wSpec).alias("rank"))
    ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()

    [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]

As you can see, if I add rowsBetween frame specification, neither rank() nor lag/lead() window functions recognize it: "Window function does not take a frame specification".

If I omit the rowsBetween frame specification at leas lag/lead() do not throw exceptions but return unexpected (for me) result: always None. And the rank() still doesn't work with different exception.

Can anybody help me to get my window functions right?

UPDATE

All right, that starts to look as a pyspark bug. I have prepared the same test in pure Spark (Scala, spark-shell):

import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
    ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;


val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
    ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])

Even though the rowsBetween cannot be applied in Scala, both rank() and lag()/lead() work as I expect when rowsBetween is omitted.

Cardiganshire answered 3/9, 2015 at 13:14 Comment(0)
E
4

As far as I can tell there two different problems. Window frame definition is simply not supported by Hive GenericUDAFRank, GenericUDAFLag and GenericUDAFLead so errors you see are an expected behavior.

Regarding issue with the following PySpark code

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))

it looks like it is related to my question https://stackoverflow.com/q/31948194/1560062 and should be addressed by SPARK-9978. As far now you can make it work by changing window definition to this:

wSpec = Window.partitionBy().orderBy(df.a)
Echikson answered 3/9, 2015 at 15:11 Comment(0)
P
0

In pyspark,the following Window spec will throw error.

joined_windowSpec = Window.partitionBy("a_x").orderBy('justDate', 'table_name').rowsBetween(Window.unboundedPreceding, 0)

Error thrown :

pyspark.errors.exceptions.captured.AnalysisException: Cannot specify window frame for lag function.

While if we omit rowBetween specification, it doesn't throw error.

joined_windowSpec = Window.partitionBy("a_x").orderBy('justDate', 'table_name')  ## NO ERROR
Paraselene answered 1/9, 2023 at 3:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.