Find median in spark SQL for multiple double datatype columns
Asked Answered
B

1

6

I have a requirement to find median for multiple double datatype columns.Request suggestion to find the correct approach.

Below is my sample dataset with one column. I am expecting the median value to be returned as 1 for my sample.

  scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+

I tried the following options

1) Hive UDAF percentile, it worked only for BigInt.

2) Hive UDAT percentile_approx, but it does not work as expected (returns 0.25 vs 1).

sqlContext.sql("select percentile_approx(num,0.5) from test").show();

+----+
| _c0|
+----+
|0.25|
+----+

3) Spark window function percent_rank- to find median the way i see is to look for all percent_rank above 0.5 and pick the max percent_rank's corresponding num value. But it does not work in all cases, especially when i have even record counts, in such case the median is the average of the middle value in the sorted distribution.

Also in the percent_rank, as i have to find the median for multiple columns, i have to calculate it in different dataframes, which to me is little complex method. Please correct me, if my understanding is not right.

+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+
Bernabernadene answered 30/12, 2016 at 23:38 Comment(0)
C
9

Which version of Apache Spark are you using out of curiosity? There were some fixes within the Apache Spark 2.0+ which included changes to approxQuantile.

If I was to run the pySpark code snippet below:

rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")

with the median calculation using approxQuantile as:

df.approxQuantile("num", [0.5], 0.25)

or

spark.sql("select percentile_approx(num, 0.5) from df").show()

the results are:

  • Spark 2.0.0: 0.25
  • Spark 2.0.1: 1.0
  • Spark 2.1.0: 1.0

Note, as these are the approximate numbers (via approxQuantile) though in general this should work well. If you need the exact median, one approach is to use numpy.median. The code snippet below is updated for this df example based on gench's SO response to How to find the median in Apache Spark with Python Dataframe API?:

from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np

def find_median(values):
    try:
        median = np.median(values) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))

# print out
df2.show()

with the output of:

+---+--------------------+------+
| id|                nums|median|
+---+--------------------+------+
|  1|[0.0, 0.0, 1.0, 1...|   1.0|
+---+--------------------+------+

Updated: Spark 1.6 Scala version using RDDs

If you are using Spark 1.6, you can calculate the median using Scala code via Eugene Zhulenev's response How can I calculate the exact median with Apache Spark. Below is the modified code that works with our example.

import org.apache.spark.SparkContext._

  val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble

with the output of:

// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0

Note, this is calculating the exact median using RDDs - i.e. you will need to convert the DataFrame column into an RDD to perform this calculation.

Cilia answered 31/12, 2016 at 5:30 Comment(6)
Thanks Denny for the pointers. I am using Spark 1.6.0 and Scala (2.10.5) for the application. I think then percentile_approx and approxQuantile are not in my options. Do you know any Scala Dataframe API like numpy for calculating median.Bernabernadene
Got it - I'll update my answer to include one that works for Spark 1.6 via Scala code.Cilia
It is getting trickier. I need to find median for multiple double datatype columns after grouping the data with other columns in the dataset. Edited the question now with this information. Appreciate your kind help.Bernabernadene
Glad to help you tackle this but instead of changing the original question and tagging it as unanswered, could you create a new question with a sample data set that corresponds to that question instead?Cilia
Posted the query as new question. #41431770Bernabernadene
Thanks - will review shortly!Cilia

© 2022 - 2024 — McMap. All rights reserved.