I'm trying to perform a large number of operations on a dataframe from a cassandra table and then save it in another table. One of these operations is as follows :
val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))
When I'm running my job, I get an exception saying that the lag
operation cannot be evaluated..
2018-10-08 12:02:22 INFO Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
2018-10-08 12:02:22 INFO CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
2018-10-08 12:02:23 INFO CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
2018-10-08 12:02:23 INFO CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:202)
at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:201)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
at com.test.TestFromCassandra.main(TestFromCassandra.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-10-08 12:02:31 INFO CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster
Line number 130 of the TestFromCassandra
file is the call of the save()
function. I didn't find any similar issue on Stackoverflow..
Do someone know why I encountering this exception ? Do the lag
function have any limitations with the rolling sum
function ?
EDIT :
I found a similar issue on Spark's Jira. There seems to be a bug on the filter
function after a reference to a window
function, and since the cassandra connector is filtering dataframes on members of the primary key (using isnotnull
function) before saving it, this is what might cause the exception.
Is there a way to perform this operation by avoiding this bug, but without using an aggregation function ? Or do someone know how to fix this bug ?
EDIT 2 :
I also tried to store my dataframe using a foreach
writer and the connector withSessionDo
function, but I still get the same exception.. No one has ever encountered this issue ?
EDIT 3 : I found another way to achieve the operation I wanted :
val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))
The problem wasn't due to a filter. It seems that it's just not possible to use the lag function on a window expression.
Window Frame specifiedwindowframe(RowFrame, currentrow$(), 2) must match the required frame specifiedwindowframe(RowFrame, -2, -2)
. Also, what I want to achieve is to shift by 2 lines the result of a 2 lines rolling sum. – Philips