how can i add a timestamp as an extra column to my dataframe
Asked Answered
B

5

11

*Hi all,

I have an easy question for you all. I have an RDD, created from kafka streaming using createStream method. Now i want to add a timestamp as a value to this rdd before converting in to dataframe. I have tried doing to add a value to the dataframe using with withColumn() but returning this error*

val topicMaps = Map("topic" -> 1)
    val now = java.util.Calendar.getInstance().getTime()

    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

      messages.foreachRDD(rdd =>
          {

            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            import sqlContext.implicits._

            val dataframe = sqlContext.read.json(rdd.map(_._2))



        val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))

val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "now" among (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15

As i came to know that DataFrames cannot be altered as they are immutable, but RDDs are immutable as well. Then what is the best way to do it. How to a value to the RDD(adding timestamp to an RDD dynamically).

Beamy answered 9/1, 2017 at 8:58 Comment(2)
where have you defined the dataframe c? can you add its schema?Lithosphere
sorry, c is the dataframe. Let me correct it.I was in a hurry.Beamy
F
18

Try current_timestamp function.

import org.apache.spark.sql.functions.current_timestamp    
df.withColumn("time_stamp", current_timestamp())
False answered 4/4, 2018 at 18:46 Comment(6)
I got the solution written above but if I want to change it to string how can do that. current_timestamp() //org.apache.spark.sql.functions._ df.withColumn("time_stamp", lit(current_timestamp())) I tried df.withColumn("time_stamp", lit(current_timestamp()).cast(string) I tried cast as string but non of them working for me.Monocyclic
@VivekKumar you should use sql data types not scala string type org.apache.spark.sql.types.DataTypes.StringTypeFalse
below code worked for me withColumn("curr_date", lit(current_timestamp().cast(StringType))).Steak
error: not found: value lit, error: not found: value current_timestamp if this needs an imported library, please include it in the codeFrumenty
I'm wondering if in this approach each executor might have a different ts (as ts will be calculated locally on each executor) or the value will be same among all executor ?Manual
yes you are correct Soheil ... timestamp should be calculated FIRST and then passed as a value otherwise there will be differences in deed.Forsterite
P
4

For add a new column with a constant like timestamp, you can use litfunction:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))
Phylloquinone answered 18/8, 2017 at 10:55 Comment(4)
Hi Javier, But this code is giving me an headache, sometimes it is printing the schema and sometimes it is throwing this java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$10$$anonfuBeamy
Hi Jack, this is strange, what version of Spark are you using? This is an example tested in spark-shell 1.6.3: import org.apache.spark.sql.functions._ val oldDF=sc.parallelize(Seq((1,1),(2,1),(3,1),(4,1),(5,1))).toDF oldDF.show val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis)) newDF.show I'm also using Spark 2.1 and is working too.Sharpfreeze
yes it works fine in the spark shell, but throws the above error while using this code in spark streaming application.I'm using spark 1.6.1 with java 1.7Beamy
shall i need to import any other package to make it happen?Beamy
S
2

This works for me. I usually perform a write after this.

val d = dataframe.withColumn("SparkLoadedAt", current_timestamp())
Shotputter answered 14/2, 2019 at 6:14 Comment(2)
error: not found: value current_timestamp what am I missing? I've been trying to get databricks scala to do this for hours.Frumenty
@Nick.McDermaid, you will have to import the function import org.apache.spark.sql.functions.current_timestampZusman
C
1

In Scala/Databricks:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("Timestamp",current_timestamp())

See my output

Cashier answered 24/10, 2019 at 18:30 Comment(2)
There are other answers that provide the OP's question, and they were posted some time ago. When posting an answer see: How do I write a good answer?, please make sure you add either a new solution, or a substantially better explanation, especially when answering older questions.Amero
See my other comments.... this is the only actual code that doesn't throw the error error: not found: value current_timestamp so in my opinion this should be the answer as the other code doesn't functionFrumenty
R
0

I see in comments that some folks are having trouble getting the timestamp to string. Here is a way to do that using spark 3 datetime format

import org.apache.spark.sql.functions._
val d =dataframe. 
  .withColumn("timeStamp_column", date_format(current_timestamp(), "y-M-d'T'H:m:sX"))
Raddled answered 14/10, 2021 at 15:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.