Spark Dataframe :How to add a index Column : Aka Distributed Data Index
Asked Answered
S

9

55

I read data from a csv file ,but don't have index.

I want to add a column from 1 to row's number.

What should I do,Thanks (scala)

Smash answered 14/4, 2017 at 7:9 Comment(0)
V
80

With Scala you can use:

import org.apache.spark.sql.functions._ 

df.withColumn("id",monotonicallyIncreasingId)

You can refer to this exemple and scala docs.

With Pyspark you can use:

from pyspark.sql.functions import monotonically_increasing_id 

df_index = df.select("*").withColumn("id", monotonically_increasing_id())
Varipapa answered 14/4, 2017 at 8:36 Comment(5)
I'm wondering why the code you wrote for scala won't work for pyspark. i.e. df.withColumn("id",monotonicallyIncreasingId)Connors
The scala code worked. Thanks However I am getting following warning "warning: there was one deprecation warning; re-run with -deprecation for details"Phebe
monotonicallyIncreasingId does not guarantee that "id" will be "from 1 to row's number". From the doc: spark.apache.org/docs/latest/api/java/org/apache/spark/sql/… "The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive"Ancohuma
I noticed that too. You can have the id starting with 1 by using the following df.withColumn("id",monotonicallyIncreasingId+1). It works fine upto a certain level, beyond which the id's are almost 15 digit.Craiova
Starting from Spark 2.0.0 the function monotonicallyIncreasingId has been deprecated. The corresponding function is, as in the case of the PySpark example, monotonically_increasing_id().Bilabiate
R
48

monotonically_increasing_id - The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

"I want to add a column from 1 to row's number."

Let say we have the following DF

+--------+-------------+-------+
| userId | productCode | count |
+--------+-------------+-------+
|     25 |        6001 |     2 |
|     11 |        5001 |     8 |
|     23 |         123 |     5 |
+--------+-------------+-------+

To generate the IDs starting from 1

val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))

This would add an index column ordered by increasing value of count.

+--------+-------------+-------+-------+
| userId | productCode | count | index |
+--------+-------------+-------+-------+
|     25 |        6001 |     2 |     1 |
|     23 |         123 |     5 |     2 |
|     11 |        5001 |     8 |     3 |
+--------+-------------+-------+-------+
Riess answered 14/10, 2017 at 2:56 Comment(2)
The above works for small datasets; if you have a data > 100M it might be a problem to finish a job. Speaking from experience.Dorkus
Is it possible to start row_number() from 0?Azrael
I
26

How to get a sequential id column id[1, 2, 3, 4...n]:

from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

Note that row_number() starts at 1, therefore subtract by 1 if you want 0-indexed column

Impi answered 18/6, 2019 at 14:32 Comment(0)
I
15

NOTE : Above approaches doesn't give a sequence number, but it does give increasing id.

Simple way to do that and ensure the order of indexes is like below.. zipWithIndex.

Sample data.

+-------------------+
|               Name|
+-------------------+
|     Ram Ghadiyaram|
|        Ravichandra|
|              ilker|
|               nick|
|             Naveed|
|      Gobinathan SP|
|Sreenivas Venigalla|
|     Jackela Kowski|
|   Arindam Sengupta|
|            Liangpi|
|             Omar14|
|        anshu kumar|
+-------------------+

    package com.example

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}

/**
  * DistributedDataIndex : Program to index an RDD  with
  */
object DistributedDataIndex extends App with Logging {

  val spark = builder
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  val df = spark.sparkContext.parallelize(
    Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
      , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
    )).toDF("Name")
  df.show
  logInfo("addColumnIndex here")
  // Add index now...
  val df1WithIndex = addColumnIndex(df)
    .withColumn("monotonically_increasing_id", monotonically_increasing_id)
  df1WithIndex.show(false)

  /**
    * Add Column Index to dataframe to each row
    */
  def addColumnIndex(df: DataFrame) = {
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  }
}

Result :

+-------------------+-----+---------------------------+
|Name               |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram     |0    |0                          |
|Ravichandra        |1    |8589934592                 |
|ilker              |2    |8589934593                 |
|nick               |3    |17179869184                |
|Naveed             |4    |25769803776                |
|Gobinathan SP      |5    |25769803777                |
|Sreenivas Venigalla|6    |34359738368                |
|Jackela Kowski     |7    |42949672960                |
|Arindam Sengupta   |8    |42949672961                |
|Liangpi            |9    |51539607552                |
|Omar14             |10   |60129542144                |
|anshu kumar        |11   |60129542145                |
+-------------------+-----+---------------------------+
Inkberry answered 28/9, 2018 at 0:58 Comment(3)
to show the difference between 2 approaches added monotonically_increasing_id as a columnInkberry
What is the use of internal.logging packageThreat
@Threat spark uses this I used same for my appInkberry
O
4

As Ram said, zippedwithindex is better than monotonically increasing id, id you need consecutive row numbers. Try this (PySpark environment):

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

where original_dataframe is the dataframe you have to add index on and row_with_index is the new schema with the column index which you can write as

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

Here, calendar_date, year_week_number, year_period_number and realization were the columns of my original dataframe. You can replace the names with the names of your columns. index is the new column name you had to add for the row numbers.

Outtalk answered 17/10, 2018 at 9:56 Comment(1)
no need to write the new schema (except you want for code explicibility) => indexed = (zipped_rdd.map(lambda ri: Row(*list(ri[0]) + [ri[1]])).toDF(new_schema)) Triglyph
O
0

If you require a unique sequence number for each row, I have a slightly different approach, where a static column is added and is used to compute the row number using that column.

val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
srcData.show(5)

+--------+--------------------+
|     Job|                Name|
+--------+--------------------+
|Morpheus|       HR Specialist|
|   Kayla|              Lawyer|
|  Trisha|          Bus Driver|
|  Robert|Elementary School...|
|    Ober|               Judge|
+--------+--------------------+

val srcDataModf = srcData.withColumn("sl_no",lit("1"))
val windowSpecRowNum =  Window.partitionBy("sl_no").orderBy("sl_no")

srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)

+-------+--------------------+--------+
|row_num|                Name|     Job|
+-------+--------------------+--------+
|      1|       HR Specialist|Morpheus|
|      2|              Lawyer|   Kayla|
|      3|          Bus Driver|  Trisha|
|      4|Elementary School...|  Robert|
|      5|               Judge|    Ober|
+-------+--------------------+--------+
Otherwise answered 19/5, 2020 at 16:22 Comment(0)
D
0

Don't use this monotonically_increasing_id() it will become nightmare for you, data skewness will happens. Happened for me.

Deferential answered 7/4, 2023 at 7:36 Comment(0)
O
0

The monotonically_increasing_id() function generates monotonically increasing 64-bit integers. The generated id numbers are guaranteed to be increasing and unique, but they are not guaranteed to be consecutive.

from pyspark.sql.functions import *

df_with_increasing_id = df.withColumn("monotonically_increasing_id", monotonically_increasing_id())
df_with_increasing_id.show()

we get the following results:

+-----+---+---------------------------+
| Name|Age|monotonically_increasing_id|
+-----+---+---------------------------+
|Alice| 10|                 8589934592|
|Susan| 12|                25769803776|
+-----+---+---------------------------+

The row_number() function generates numbers that are consecutive.

Combine this with monotonically_increasing_id() to generate two columns of numbers that can be used to identify data entries.

from pyspark.sql.functions import *
from pyspark.sql.window import *

window = Window.orderBy(col('monotonically_increasing_id'))
df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', row_number().over(window))
df_with_consecutive_increasing_id.show()

we get the following results:

+-----+---+---------------------------+-------------+
| Name|Age|monotonically_increasing_id|increasing_id|
+-----+---+---------------------------+-------------+
|Alice| 10|                 8589934592|            1|
|Susan| 12|                25769803776|            2|
+-----+---+---------------------------+-------------+
Occultation answered 21/9, 2023 at 5:50 Comment(0)
O
-1

For SparkR:

(Assuming sdf is some sort of spark data frame)

sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())

Oviduct answered 2/7, 2020 at 20:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.