How does Distinct() function work in Spark?
Asked Answered
P

5

29

I'm a newbie to Apache Spark and was learning basic functionalities. Had a small doubt.Suppose I have an RDD of tuples (key, value) and wanted to obtain some unique ones out of them. I use distinct() function. I'm wondering on what basis does the function consider that tuples as disparate..? Is it based on the keys, or values, or both?

Pantia answered 20/6, 2015 at 23:47 Comment(0)
I
28

.distinct() is definitely doing a shuffle across partitions. To see more of what's happening, run a .toDebugString on your RDD.

val hashPart = new HashPartitioner(<number of partitions>)

val myRDDPreStep = <load some RDD>

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)

which for an RDD example I have (myRDDPreStep is already hash-partitioned by key, persisted by StorageLevel.MEMORY_AND_DISK_SER, and checkpointed), returns:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    |    ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
        |    myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
        |        CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
        |    myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]

Note that there may be more efficient ways to get distinct that involve fewer shuffles, ESPECIALLY if your RDD is already partitioned in a smart way and the partitions are not overly skewed.

See Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? and Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

Infliction answered 29/6, 2015 at 23:23 Comment(0)
S
15

The API docs for RDD.distinct() only provide a one sentence description:

"Return a new RDD containing the distinct elements in this RDD."

From recent experience I can tell you that in a tuple-RDD the tuple as a whole is considered.

If you want distinct keys or distinct values, then depending on exactly what you want to accomplish, you can either:

A. call groupByKey() to transform {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} to {(k1,[v11,v12]), (k2,[v21,v22])} ; or

B. strip out either the keys or values by calling keys() or values() followed by distinct()

As of this writing (June 2015) UC Berkeley + EdX is running a free online course Introduction to Big Data and Apache Spark which would provide hands on practice with these functions.

Solmization answered 21/6, 2015 at 0:19 Comment(2)
Hi Paul! Let me suppose that we have an RDD tuple as follows: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)..etc., Here you could observe that both the keys and values are repeating in various tuples. so if I apply distinct() on the above RDD, what would be the result..? Please take a moment. Thank you! And, yes I was taking that course online! :)Pantia
I don't have time right now but you can set up your own RDD with myRDD = sc.parallelize([ (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22) ]); This may even work in one of the previous Lab notebooks from the Spark course. Then run myRDD.distinct().collect() to test the outputSolmization
A
12

Justin Pihony is right. Distinct uses the hashCode and equals method of the objects for this determination. It's return the distinct elements(object)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))

Distinct

rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)

If you want to apply distinct on key. In that case reduce by is better option

ReduceBy

 val reduceRDD= rdd.map(tup =>
    (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)

reduceRDD.collect().foreach(println)

Output:-

(2,20)
(1,20)
(3,21)
Anemophilous answered 9/9, 2016 at 9:19 Comment(0)
H
9

distinct uses the hashCode and equals method of the objects for this determination. Tuples come built in with the equality mechanisms delegating down into the equality and position of each object. So, distinct will work against the entire Tuple2 object. As Paul pointed out, you can call keys or values and then distinct. Or you can write your own distinct values via aggregateByKey, which would keep the key pairing. Or if you want the distinct keys, then you could use a regular aggregate

Hyperphagia answered 21/6, 2015 at 5:50 Comment(0)
A
3

It looks like the distinct will get rid of (key, value) duplicates.

In the below example (1,20) and (2,20) are repeated twice in myRDD, but after a distinct(), the duplicates are removed.

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22

scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)

scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Aerodyne answered 24/10, 2015 at 23:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.