Transpose DataFrame Without Aggregation in Spark with scala
Asked Answered
V

4

22

I looked number different solutions online, but count not find what I am trying to achine. Please help me on this.

I am using Apache Spark 2.1.0 with Scala. Below is my dataframe:


+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1       | val1  |
|col2       | val2  |
|col3       | val3  |
|col4       | val4  |
|col5       | val5  |
+-----------+-------+

I want this to be transpose to, as below:


+-----+-------+-----+------+-----+
|col1 | col2  |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2  |val3 | val4 |val5 |
+-----+-------+-----+------+-----+
Vig answered 20/3, 2018 at 19:30 Comment(2)
What if two records have the same COLUMN_NAME but different VALUE? What should be the value then? And if you know there are no such repetitions, your dataframe is either very small (in which case you can just collect it and transform using plain Scala) or the result would have too many columns.Warrantable
Two records never will have the same column names. In fact I am getting the table insert/update details coming in the multiple rows, one column with columnn-names and other with values, and my plan is to tranpose them into dataframe and update them directly into the Kudu database. First column values coming as schema and send column values coming as values. So I need to build dataframe out of it. Please let me know if you have any other suggestions/thoughts.Vig
D
13

If your dataframe is small enough as in the question, then you can collect COLUMN_NAME to form schema and collect VALUE to form the rows and then create a new dataframe as

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row] 
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)

which should give you

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
Disoblige answered 21/3, 2018 at 10:27 Comment(3)
You're awesome Ramesh ! This is exactly what I need. Thank you so much for your help. Performance wise this is better than Pivot.Vig
Great to hear that @MarutiK, don't forget to upvote when you would be eligible ;)Disoblige
How can we do this in pyspark ?Virtuosity
G
19

You can do this using pivot, but you still need aggregation but what if you have multiple value for a COLUMN_NAME?

val df = Seq(
  ("col1", "val1"),
  ("col2", "val2"),
  ("col3", "val3"),
  ("col4", "val4"),
  ("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")

df
  .groupBy()
  .pivot("COLUMN_NAME").agg(first("VALUE"))
  .show()

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

EDIT:

if your dataframe is really that small as in your example, you can collect it as Map:

val map = df.as[(String,String)].collect().toMap

and then apply this answer

Glorification answered 20/3, 2018 at 19:57 Comment(5)
Thank you so much for your quick response ! Greatly appreciated ! It worked :) . The only thing is because of pivot, it is slow.Vig
Hey Raphael, I know we can do this using Map as well, but I couldn't achieve the results. Please share if you have logic with Map.Vig
@MarutiK just call toSeq on your map, first, then apply my answerGlorification
I could able to do toSeq and after that it is failing with groupBy() for empty values. I am getting error as " <console>:46: error: not enough arguments for method groupBy: (f: ((String, String)) => K)scala.collection.immutable.Map[K,Seq[(String, String)]] ". Any advise on this.Vig
@RaphaelRoth Thanks for your answer. What if there is only one column? I tried this solution for it and I get an extra row, how can I get rid of it?Calefacient
D
13

If your dataframe is small enough as in the question, then you can collect COLUMN_NAME to form schema and collect VALUE to form the rows and then create a new dataframe as

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row] 
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)

which should give you

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
Disoblige answered 21/3, 2018 at 10:27 Comment(3)
You're awesome Ramesh ! This is exactly what I need. Thank you so much for your help. Performance wise this is better than Pivot.Vig
Great to hear that @MarutiK, don't forget to upvote when you would be eligible ;)Disoblige
How can we do this in pyspark ?Virtuosity
P
2

Another solution though lengthy using crosstab.

 val dfp = spark.sql(""" with t1 (
 select  'col1' c1, 'val1' c2  union all
 select  'col2' c1, 'val2' c2  union all
 select  'col3' c1, 'val3' c2  union all
 select  'col4' c1, 'val4' c2  union all
 select  'col5' c1, 'val5' c2
  )  select   c1  COLUMN_NAME,   c2  VALUE     from t1
""")
dfp.show(50,false)

+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1       |val1 |
|col2       |val2 |
|col3       |val3 |
|col4       |val4 |
|col5       |val5 |
+-----------+-----+

val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |1   |0   |0   |0   |0   |
|val3             |0   |0   |1   |0   |0   |
|val2             |0   |1   |0   |0   |0   |
|val5             |0   |0   |0   |0   |1   |
|val4             |0   |0   |0   |1   |0   |
+-----------------+----+----+----+----+----+

val needed_cols = dfp2.columns.drop(1)

needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)

val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |val1|0   |0   |0   |0   |
|val3             |0   |0   |val3|0   |0   |
|val2             |0   |val2|0   |0   |0   |
|val5             |0   |0   |0   |0   |val5|
|val4             |0   |0   |0   |val4|0   |
+-----------------+----+----+----+----+----+

dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
Polysyndeton answered 18/12, 2020 at 17:53 Comment(0)
P
0

To enhance Ramesh Maharjan's answer, collect and then convert it to a map.

val mp = df.as[(String,String)].collect.toMap

with a dummy dataframe, we can build further using foldLeft

val f = Seq("1").toDF("dummy")

mp.keys.toList.sorted.foldLeft(f) { (acc,x) => acc.withColumn(mp(x),lit(x) ) }.drop("dummy").show(false)

+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
Polysyndeton answered 18/12, 2020 at 18:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.