How to transpose dataframe in Spark 1.5 (no pivot operator available)?
Asked Answered
T

3

3

I want to transpose following table using spark scala without Pivot function

I am using Spark 1.5.1 and Pivot function does not support in 1.5.1. Please suggest suitable method to transpose following table:

Customer Day   Sales
1        Mon    12
1        Tue    10
1        Thu    15
1        Fri     2
2        Sun    10
2        Wed     5
2        Thu     4
2        Fri     3

Output table :

Customer Sun Mon Tue Wed Thu Fri
   1      0   12  10   0  15  2
   2     10    0   0   5  4   3

Following code is not working as I am using Spark 1.5.1 and pivot function is available from Spark 1.6:

    var Trans = Cust_Sales.groupBy("Customer").Pivot("Day").sum("Sales")
Tepee answered 25/3, 2016 at 7:26 Comment(0)
L
4

Not sure how efficient that is, but you can use collect to get all the distinct days, and then add these columns, then use groupBy and sum:

// get distinct days from data (this assumes there are not too many of them):
val days: Array[String] = df.select("Day")
    .distinct()
    .collect()
    .map(_.getAs[String]("Day"))

// add column for each day with the Sale value if days match:
val withDayColumns = days.foldLeft(df) { 
    case (data, day) => data.selectExpr("*", s"IF(Day = '$day', Sales, 0) AS $day")
}

// wrap it up 
val result = withDayColumns
   .drop("Day")
   .drop("Sales")
   .groupBy("Customer")
   .sum(days: _*)

result.show()

Which prints (almost) what you wanted:

+--------+--------+--------+--------+--------+--------+--------+
|Customer|sum(Tue)|sum(Thu)|sum(Sun)|sum(Fri)|sum(Mon)|sum(Wed)|
+--------+--------+--------+--------+--------+--------+--------+
|       1|      10|      15|       0|       2|      12|       0|
|       2|       0|       4|      10|       3|       0|       5|
+--------+--------+--------+--------+--------+--------+--------+

I'll leave it to you to rename / reorder the columns if needed.

Lichtenfeld answered 25/3, 2016 at 7:49 Comment(3)
There is another solution also but it applies aggregate or aggregateByKey, but I've just woken up and it needs a big of brain gymnastics to write :)Pinsky
@Tzach Zohar thank you very much this works really good. It processed 3 GB data in 18 mins with 28 different distinct values in my original table(here it is day column). Once again thanks for your helpTepee
@Tzach Zohar, is there any another alternative if I have around 30,000 to 100,000 distinct values in second column (i.e Day in this example) for taking Transpose using scala. So after transpose i will have max 100,000 columns. Let me know if you have solution for large data set. Earlier solutions is working very well with small number of distinct values.Tepee
S
0

If you are working with python below code might help. Let's say you want to transpose spark DataFrame df:

pandas_df = df.toPandas().transpose().reset_index()
transposed_df = sqlContext.createDataFrame(pandas_df)
transposed_df.show()
Spirochete answered 21/3, 2017 at 11:8 Comment(0)
S
0

Consider a data frame which has 6 columns and we want to group by first 4 columns and pivot on col5 while aggregating on col6 (say sum on it). So lets say you cannot use the spark 1.6 version then the below code can be written (in spark 1.5) as:

val pivotedDf = df_to_pivot
    .groupBy(col1,col2,col3,col4)
    .pivot(col5)
    .agg(sum(col6))

Here is the code with same output but without using in-built pivot function:

import scala.collection.SortedMap

//Extracting the col5 distinct values to create the new columns
val distinctCol5Values = df_to_pivot
    .select(col(col5))
    .distinct
    .sort(col5)  // ensure that the output columns are in a consistent logical order
    .map(_.getString(0))
    .toArray
    .toSeq

//Grouping by the data frame to be pivoted on col1-col4
val pivotedAgg = df_to_pivot.rdd
      .groupBy{row=>(row.getString(col1Index),
      row.getDate(col2Index),
      row.getDate(col3Index),
      row.getString(col4Index))}

//Initializing a List of tuple of (String, double values) to be filled in the columns that will be created
val pivotColListTuple = distinctCol5Values.map(ft=> (ft,0.0))
// Using Sorted Map to ensure the order is maintained
var distinctCol5ValuesListMap = SortedMap(pivotColListTuple : _*)
//Pivoting the data on col5 by opening the grouped data
val pivotedRDD = pivotedAgg.map{groupedRow=>
    distinctCol5ValuesListMap = distinctCol5ValuesListMap.map(ft=> (ft._1,0.0))
     groupedRow._2.foreach{row=>
//Updating the distinctCol5ValuesListMap values to reflect the changes
//Change this part accordingly to what you want
       distinctCol5ValuesListMap = distinctCol5ValuesListMap.updated(row.getString(col5Index),
         distinctCol5ValuesListMap.getOrElse(row.getString(col5Index),0.0)+row.getDouble(col6Index))
     }        
    Row.fromSeq(Seq(groupedRow._1._1,groupedRow._1._2,groupedRow._1._3,groupedRow._1._4) ++ distinctCol5ValuesListMap.values.toSeq)
  }

//Consructing the structFields for new columns
val colTypesStruct = distinctCol5ValuesListMap.map(colName=>StructField(colName._1,DoubleType))
//Adding the first four column structFields with the new columns struct
val opStructType = StructType(Seq(StructField(col1Name,StringType),
                                     StructField(col2Name,DateType),
                                     StructField(col3Name,DateType),
                                     StructField(col4Name,StringType)) ++ colTypesStruct )

//Creating the final data frame
val pivotedDF = sqlContext.createDataFrame(pivotedRDD,opStructType)
Scorekeeper answered 21/3, 2018 at 12:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.