Spark Cannot evaluate expression: lag of a window expression
Asked Answered
P

2

6

I'm trying to perform a large number of operations on a dataframe from a cassandra table and then save it in another table. One of these operations is as follows :

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))

When I'm running my job, I get an exception saying that the lag operation cannot be evaluated..

2018-10-08 12:02:22 INFO  Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
    2018-10-08 12:02:22 INFO  CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
            at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
            at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:202)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:201)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.immutable.List.foreach(List.scala:381)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.immutable.List.map(List.scala:285)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.AbstractTraversable.map(Traversable.scala:104)
            at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
            at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
            at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
            at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
            at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
            at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
            at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
            at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
            at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
            at com.test.TestFromCassandra.main(TestFromCassandra.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    2018-10-08 12:02:31 INFO  CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster

Line number 130 of the TestFromCassandra file is the call of the save() function. I didn't find any similar issue on Stackoverflow..

Do someone know why I encountering this exception ? Do the lag function have any limitations with the rolling sum function ?

EDIT : I found a similar issue on Spark's Jira. There seems to be a bug on the filter function after a reference to a window function, and since the cassandra connector is filtering dataframes on members of the primary key (using isnotnull function) before saving it, this is what might cause the exception. Is there a way to perform this operation by avoiding this bug, but without using an aggregation function ? Or do someone know how to fix this bug ?

EDIT 2 : I also tried to store my dataframe using a foreach writer and the connector withSessionDo function, but I still get the same exception.. No one has ever encountered this issue ?

EDIT 3 : I found another way to achieve the operation I wanted :

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))

The problem wasn't due to a filter. It seems that it's just not possible to use the lag function on a window expression.

Philips answered 1/10, 2018 at 14:20 Comment(0)
J
4

I ran into the same problem and then I noticed you are using the over function inside lag (same as me). I changed to something like this:

df.withColumn("lag1", lag(sum(col("temp1")), 2, 0).over(lagWindow))

Jablonski answered 9/10, 2018 at 11:7 Comment(2)
Thank you for your proposal, but I get a new AnalysisException: Window Frame specifiedwindowframe(RowFrame, currentrow$(), 2) must match the required frame specifiedwindowframe(RowFrame, -2, -2). Also, what I want to achieve is to shift by 2 lines the result of a 2 lines rolling sum.Philips
I would say that is related with your usage of rowsBetween function. You are using lag to sum the last two rows (counting from the current row) and then rowsBetween to range the next two rows (also counting from the current row). It won't match. This is a guess as I am not very experienced with this.Jablonski
F
2

I am seeing the same error. Although there is a workaround for this problem, spark should fix this. I believe you will hit this issue with any window function, not just LAG. The reason I believe is that spark tries to do code gen on filter but the window functions are not code-genable. A workaround would be to create a column with this window expression and use that column in the filter.

Foreground answered 4/10, 2018 at 18:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.