How to sort an RDD in Scala Spark?
Asked Answered
L

3

34

Reading Spark method sortByKey :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

Is it possible to return just "N" amount of results. So instead of returning all results, just return the top 10. I could convert the sorted collection to an Array and use take method but since this is an O(N) operation is there a more efficient method ?

Longwood answered 23/5, 2014 at 21:32 Comment(1)
So you know how to sort, and you are asking how to take the top N. Can I suggest editing the question summary?Angie
S
19

Most likely you have already perused the source code:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

And, as you say, the entire data must go through the shuffle stage - as seen in the snippet.

However, your concern about subsequently invoking take(K) may not be so accurate. This operation does NOT cycle through all N items:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

So then, it would seem:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (at least for small K) << O(myRdd.sortByKey().collect()

Scharf answered 24/5, 2014 at 7:23 Comment(2)
sortByKey() as other RDD transformations are subject to lazy evaluation.. Would sortByKey.take(k) optimized into takeOrdered(k, func) or into take(k).sortByKey? Thought that was the whole point of lazy evaluation so physical plans can be optimized. It could be implemented better in Data Frames?Janinejanis
@Ruslan I do not believe that such rearrangements / optimizations presently occur within spark core. I am only aware of similar optimizations happening within the SQL/catalyst optimizer.Scharf
A
51

If you only need the top 10, use rdd.top(10). It avoids sorting, so it is faster.

rdd.top makes one parallel pass through the data, collecting the top N in each partition in a heap, then merges the heaps. It is an O(rdd.count) operation. Sorting would be O(rdd.count log rdd.count), and incur a lot of data transfer — it does a shuffle, so all of the data would be transmitted over the network.

Angie answered 14/6, 2014 at 0:20 Comment(4)
I did not know about this method. It is a better solution than sort() . So this is a better answer to the question than mine (though it does maybe provide some useful background ). I am upvoting.Scharf
Hi, I have a pairRdd, is there a way to use top method in this pairRdd? For example, top(10) will return 10 elements for each key in this 'pairRdd'. I really need to know this.Frieze
No, it does not work like that. I suggest a separate question for finding the top 10 by key, as it's a bigger topic.Angie
spark.apache.org/docs/latest/api/scala/…Angie
S
19

Most likely you have already perused the source code:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

And, as you say, the entire data must go through the shuffle stage - as seen in the snippet.

However, your concern about subsequently invoking take(K) may not be so accurate. This operation does NOT cycle through all N items:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

So then, it would seem:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (at least for small K) << O(myRdd.sortByKey().collect()

Scharf answered 24/5, 2014 at 7:23 Comment(2)
sortByKey() as other RDD transformations are subject to lazy evaluation.. Would sortByKey.take(k) optimized into takeOrdered(k, func) or into take(k).sortByKey? Thought that was the whole point of lazy evaluation so physical plans can be optimized. It could be implemented better in Data Frames?Janinejanis
@Ruslan I do not believe that such rearrangements / optimizations presently occur within spark core. I am only aware of similar optimizations happening within the SQL/catalyst optimizer.Scharf
D
8

Another option, at least from PySpark 1.2.0, is the use of takeOrdered.

In ascending order:

rdd.takeOrdered(10)

In descending order:

rdd.takeOrdered(10, lambda x: -x)

Top k values for k,v pairs:

rdd.takeOrdered(10, lambda (k, v): -v)
Diastema answered 19/6, 2015 at 17:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.