How can I calculate exact median with Apache Spark?
Asked Answered
L

2

16

This page contains some statistics functions (mean, stdev, variance, etc.) but it does not contain the median. How can I calculate exact median?

Leroy answered 26/1, 2015 at 21:4 Comment(1)
the percentile_approx does not work for groups with even number of entries. To make it work there, a probable workaround is to take the average of 50th percentile and the next value. In pyspark something below works.. ((F.percentile_approx('val', 0.5)+ F.percentile_approx('val', 0.500000000001))*.5).alias('med_val2') )Loop
D
19

You need to sort RDD and take element in the middle or average of two elements. Here is example with RDD[Int]:

  import org.apache.spark.SparkContext._

  val rdd: RDD[Int] = ???

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble
Doubloon answered 26/1, 2015 at 23:31 Comment(7)
what is this "lookup" method ? AFAIK it does not exist in RDD.Neath
@javadba yeah, you need to import SparkContext._ to bring PairRDD implicits in scopeDoubloon
p.s. I think that there are faster algorithms for finding median that don't require full sorting (en.wikipedia.org/wiki/Selection_algorithm)Morelli
unfortunately they are not applicable to distributed RDDDoubloon
Can DataFrame API be used instead of RDD API?Autointoxication
Yes, see #31433343Dyeing
@EugeneZhulenev It would be better to persist the sorted RDD so it that it won't recompute the DAG while doing lookupsLengthwise
G
7

Using Spark 2.0+ and the DataFrame API you can use the approxQuantile method:

def approxQuantile(col: String, probabilities: Array[Double], relativeError: Double)

It will also work on multiple columns at the same time since Spark version 2.2. By setting probabilites to Array(0.5) and relativeError to 0, it will compute the exact median. From the documentation:

The relative target precision to achieve (greater than or equal to 0). If set to zero, the exact quantiles are computed, which could be very expensive.

Despite this, there seems to be some issues with the precision when setting relativeError to 0, see the question here. A low error close to 0 will in some instances work better (will depend on Spark version).


A small working example which calculates the median of the numbers from 1 to 99 (both inclusive) and uses a low relativeError:

val df = (1 to 99).toDF("num")
val median = df.stat.approxQuantile("num", Array(0.5), 0.001)(0)
println(median)

The median returned is 50.0.

Gudrin answered 14/12, 2017 at 3:44 Comment(4)
Monica, do you know why when I run your code, I get NameError: name 'Array' is not defined? this does not seem like it's a package i need to importAlmost
@mathlover: Are you using Scala? Maybe you overwrite the name somewhere with a variable?Gudrin
I am using PySparkAlmost
@mathlover: Then it's not surprising you can't use a Scala version straight off. You need to adapt it a bit.Gudrin

© 2022 - 2024 — McMap. All rights reserved.