Why do year and month functions result in long overflow in Spark?
Asked Answered
I

1

6

I'm trying to make year and month columns from a column named logtimestamp (of type TimeStampType) in spark. The data source is cassandra. I am using sparkshell to perform these steps, here is the code I have written -

import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types._
var logsDF = spark.read.cassandraFormat("tableName", "cw").load()
var newlogs = logsDF.withColumn("year", year(col("logtimestamp")))
 .withColumn("month", month(col("logtimestamp")))
newlogs.write.cassandraFormat("tableName_v2", "cw")
 .mode("Append").save()

But these steps do not succeed, I end up with the following error

java.lang.ArithmeticException: long overflow
    at java.lang.Math.multiplyExact(Math.java:892)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:205)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:166)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:327)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:325)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:252)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:426)
    at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:34)
    at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:21)
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.$anonfun$getIterator$2(CassandraScanPartitionReaderFactory.scala:110)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.next(CassandraScanPartitionReaderFactory.scala:66)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I thought it was something to do with null values in the table so I ran the following

scala> logsDF.filter("logtimestamp is null").show()

But this too gave the same long overflow error.

How come there is an overflow in spark but not in cassandra when both have timestamps of 8 bytes? What could be the issue here and how do I extract year and month from timestamp correctly?

Iasis answered 2/11, 2021 at 11:28 Comment(6)
Could you share an example of values from logtimestamp column?Gordie
@Gabip Yes, here are some examples of logtimestamp (as they are shown in spark) - 2021-03-04 10:29:59.311, 2021-03-04 10:29:59.014, 2021-05-03 21:29:56.699. Their type is TimestampType according to logsDF.dtypesIasis
Could issues.apache.org/jira/browse/SPARK-35679 be related?Kith
@Kith Oh could this be the issue? But I'm using spark 3.1.2 with scala 2.12.10. The link doesn't mention anything about 3.1.2Iasis
It mentions the issue is fixed in 3.1.3Kith
@Kith Upgraded to 3.2.0 but the issue still persistsIasis
I
2

Turns out one of the cassandra table had a timestamp value that was greater than the highest value allowed by spark but not large enough to overflow in cassandra. The timestamp had been manually edited to get around the upserting that is done by default in cassandra, but this led to some large values being formed during development. Ran a python script to find this out.

Iasis answered 3/11, 2021 at 11:14 Comment(4)
Nice find! Can you add a sample value that was causing this?Kith
@Kith Unfortunately I deleted those rows. It was just a very large number. While Cassandra showed other proper timestamps in yyyy-MM-hh... format, it showed the number as it is.Iasis
Aaahhh niceties of schema-on-read?Kith
@Kith Yes, I think that is what it was. Tried a lot to disable it for the cassandra connector and use BigInt for timestamp while debugging, but was unsuccessful.Iasis

© 2022 - 2024 — McMap. All rights reserved.