I read data from a csv file ,but don't have index.
I want to add a column from 1 to row's number.
What should I do,Thanks (scala)
I read data from a csv file ,but don't have index.
I want to add a column from 1 to row's number.
What should I do,Thanks (scala)
With Scala you can use:
import org.apache.spark.sql.functions._
df.withColumn("id",monotonicallyIncreasingId)
You can refer to this exemple and scala docs.
With Pyspark you can use:
from pyspark.sql.functions import monotonically_increasing_id
df_index = df.select("*").withColumn("id", monotonically_increasing_id())
monotonicallyIncreasingId
has been deprecated. The corresponding function is, as in the case of the PySpark example, monotonically_increasing_id()
. –
Bilabiate monotonically_increasing_id - The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
"I want to add a column from 1 to row's number."
Let say we have the following DF
+--------+-------------+-------+ | userId | productCode | count | +--------+-------------+-------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+-------+
To generate the IDs starting from 1
val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))
This would add an index column ordered by increasing value of count.
+--------+-------------+-------+-------+ | userId | productCode | count | index | +--------+-------------+-------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+-------+-------+
row_number()
from 0? –
Azrael How to get a sequential id column id[1, 2, 3, 4...n]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
Note that row_number() starts at 1, therefore subtract by 1 if you want 0-indexed column
NOTE : Above approaches doesn't give a sequence number, but it does give increasing id.
Simple way to do that and ensure the order of indexes is like below.. zipWithIndex
.
Sample data.
+-------------------+
| Name|
+-------------------+
| Ram Ghadiyaram|
| Ravichandra|
| ilker|
| nick|
| Naveed|
| Gobinathan SP|
|Sreenivas Venigalla|
| Jackela Kowski|
| Arindam Sengupta|
| Liangpi|
| Omar14|
| anshu kumar|
+-------------------+
package com.example
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
/**
* DistributedDataIndex : Program to index an RDD with
*/
object DistributedDataIndex extends App with Logging {
val spark = builder
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(
Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
, "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
)).toDF("Name")
df.show
logInfo("addColumnIndex here")
// Add index now...
val df1WithIndex = addColumnIndex(df)
.withColumn("monotonically_increasing_id", monotonically_increasing_id)
df1WithIndex.show(false)
/**
* Add Column Index to dataframe to each row
*/
def addColumnIndex(df: DataFrame) = {
spark.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(df.schema.fields :+ StructField("index", LongType, false)))
}
}
Result :
+-------------------+-----+---------------------------+
|Name |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram |0 |0 |
|Ravichandra |1 |8589934592 |
|ilker |2 |8589934593 |
|nick |3 |17179869184 |
|Naveed |4 |25769803776 |
|Gobinathan SP |5 |25769803777 |
|Sreenivas Venigalla|6 |34359738368 |
|Jackela Kowski |7 |42949672960 |
|Arindam Sengupta |8 |42949672961 |
|Liangpi |9 |51539607552 |
|Omar14 |10 |60129542144 |
|anshu kumar |11 |60129542145 |
+-------------------+-----+---------------------------+
monotonically_increasing_id
as a column –
Inkberry As Ram said, zippedwithindex
is better than monotonically increasing id, id you need consecutive row numbers. Try this (PySpark environment):
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
where original_dataframe is the dataframe you have to add index on and row_with_index is the new schema with the column index which you can write as
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
Here, calendar_date
, year_week_number
, year_period_number
and realization were the columns of my original dataframe. You can replace the names with the names of your columns. index
is the new column name you had to add for the row numbers.
indexed = (zipped_rdd.map(lambda ri: Row(*list(ri[0]) + [ri[1]])).toDF(new_schema))
–
Triglyph If you require a unique sequence number for each row, I have a slightly different approach, where a static column is added and is used to compute the row number using that column.
val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
srcData.show(5)
+--------+--------------------+
| Job| Name|
+--------+--------------------+
|Morpheus| HR Specialist|
| Kayla| Lawyer|
| Trisha| Bus Driver|
| Robert|Elementary School...|
| Ober| Judge|
+--------+--------------------+
val srcDataModf = srcData.withColumn("sl_no",lit("1"))
val windowSpecRowNum = Window.partitionBy("sl_no").orderBy("sl_no")
srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)
+-------+--------------------+--------+
|row_num| Name| Job|
+-------+--------------------+--------+
| 1| HR Specialist|Morpheus|
| 2| Lawyer| Kayla|
| 3| Bus Driver| Trisha|
| 4|Elementary School...| Robert|
| 5| Judge| Ober|
+-------+--------------------+--------+
Don't use this monotonically_increasing_id()
it will become nightmare for you, data skewness will happens. Happened for me.
The monotonically_increasing_id() function generates monotonically increasing 64-bit integers. The generated id numbers are guaranteed to be increasing and unique, but they are not guaranteed to be consecutive.
from pyspark.sql.functions import *
df_with_increasing_id = df.withColumn("monotonically_increasing_id", monotonically_increasing_id())
df_with_increasing_id.show()
we get the following results:
+-----+---+---------------------------+
| Name|Age|monotonically_increasing_id|
+-----+---+---------------------------+
|Alice| 10| 8589934592|
|Susan| 12| 25769803776|
+-----+---+---------------------------+
The row_number() function generates numbers that are consecutive.
Combine this with monotonically_increasing_id() to generate two columns of numbers that can be used to identify data entries.
from pyspark.sql.functions import *
from pyspark.sql.window import *
window = Window.orderBy(col('monotonically_increasing_id'))
df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', row_number().over(window))
df_with_consecutive_increasing_id.show()
we get the following results:
+-----+---+---------------------------+-------------+
| Name|Age|monotonically_increasing_id|increasing_id|
+-----+---+---------------------------+-------------+
|Alice| 10| 8589934592| 1|
|Susan| 12| 25769803776| 2|
+-----+---+---------------------------+-------------+
For SparkR:
(Assuming sdf is some sort of spark data frame)
sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())
© 2022 - 2024 — McMap. All rights reserved.
df.withColumn("id",monotonicallyIncreasingId)
– Connors