I'm a newbie to Apache Spark and was learning basic functionalities. Had a small doubt.Suppose I have an RDD of tuples (key, value) and wanted to obtain some unique ones out of them. I use distinct() function. I'm wondering on what basis does the function consider that tuples as disparate..? Is it based on the keys, or values, or both?
.distinct()
is definitely doing a shuffle across partitions. To see more of what's happening, run a .toDebugString
on your RDD.
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
which for an RDD example I have (myRDDPreStep is already hash-partitioned by key, persisted by StorageLevel.MEMORY_AND_DISK_SER, and checkpointed), returns:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Note that there may be more efficient ways to get distinct that involve fewer shuffles, ESPECIALLY if your RDD is already partitioned in a smart way and the partitions are not overly skewed.
See Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? and Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?
The API docs for RDD.distinct() only provide a one sentence description:
"Return a new RDD containing the distinct elements in this RDD."
From recent experience I can tell you that in a tuple-RDD the tuple as a whole is considered.
If you want distinct keys or distinct values, then depending on exactly what you want to accomplish, you can either:
A. call groupByKey()
to transform {(k1,v11),(k1,v12),(k2,v21),(k2,v22)}
to {(k1,[v11,v12]), (k2,[v21,v22])}
; or
B. strip out either the keys or values by calling keys()
or values()
followed by distinct()
As of this writing (June 2015) UC Berkeley + EdX is running a free online course Introduction to Big Data and Apache Spark which would provide hands on practice with these functions.
myRDD = sc.parallelize([ (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22) ]);
This may even work in one of the previous Lab notebooks from the Spark course. Then run myRDD.distinct().collect() to test the output
–
Solmization Justin Pihony is right. Distinct uses the hashCode and equals method of the objects for this determination. It's return the distinct elements(object)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
Distinct
rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
If you want to apply distinct on key. In that case reduce by is better option
ReduceBy
val reduceRDD= rdd.map(tup =>
(tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)
reduceRDD.collect().foreach(println)
Output:-
(2,20)
(1,20)
(3,21)
distinct
uses the hashCode
and equals
method of the objects for this determination. Tuples come built in with the equality mechanisms delegating down into the equality and position of each object. So, distinct
will work against the entire Tuple2
object. As Paul pointed out, you can call keys
or values
and then distinct
. Or you can write your own distinct values via aggregateByKey
, which would keep the key pairing. Or if you want the distinct keys, then you could use a regular aggregate
It looks like the distinct
will get rid of (key, value) duplicates.
In the below example (1,20) and (2,20) are repeated twice in myRDD
, but after a distinct()
, the duplicates are removed.
scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22
scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)
scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
© 2022 - 2024 — McMap. All rights reserved.