How do I put a case class in an rdd and have it act like a tuple(pair)?
Asked Answered
P

2

10

Say for example, I have a simple case class

case class Foo(k:String, v1:String, v2:String)

Can I get spark to recognise this as a tuple for the purposes of something like this, without converting to a tuple in, say a map or keyBy step.

val rdd = sc.parallelize(List(Foo("k", "v1", "v2")))
// Swap values
rdd.mapValues(v => (v._2, v._1))

I don't even care if it looses the original case class after such an operation. I've tried the following with no luck. I'm fairly new to Scala, am I missing something?

case class Foo(k:String, v1:String, v2:String)
  extends Tuple2[String, (String, String)](k, (v1, v2))

edit: In the above snippet the case class extends Tuple2, this does not produce the desired effect that the RDD class and functions do not treat it like a tuple and allow PairRDDFunctions, such as mapValues, values, reduceByKey, etc.

Pforzheim answered 20/1, 2016 at 15:42 Comment(7)
Possible duplicate of In Scala, is there an easy way to convert a case class into a tuple?Barraza
No, I'm aware of that. When given a tuple spark normally allows extra operations such as mapValues, but this extension of tuple doesn't get thosePforzheim
So I guess I'm trying to figure out what you actually want to do here. Are you saying that you have a case class and you want to perform tuple-like functions on that case class without actually turning it into a tuple?Agnes
Yes, the case class extends tuple, but for some reason in spark it doesn't allow me access to the PairRDDFunctionsPforzheim
Problem is that case classes are not designed for inheritance in the first place and case-to-case inheritance is not allowed. Could you explain what is the point of all of that if you don't even care if it looses the original case class after such an operation?Geminate
I think my point is if I pass a case class which extends Tuple2 to an RDD why does it not behave the same as passing a Tuple2 directly?Pforzheim
How are you getting that to compile in the first place? Tuple2 is a case class, and so it can't be extended by another case class.Blackamoor
F
10

Extending TupleN isn't a good idea for a number of reasons, with one of the best being the fact that it's deprecated, and on 2.11 it's not even possible to extend TupleN with a case class. Even if you make your Foo a non-case class, defining it on 2.11 with -deprecation will show you this: "warning: inheritance from class Tuple2 in package scala is deprecated: Tuples will be made final in a future version.".

If what you care about is convenience of use and you don't mind the (almost certainly negligible) overhead of the conversion to a tuple, you can enrich a RDD[Foo] with the syntax provided by PairRDDFunctions with a conversion like this:

import org.apache.spark.rdd.{ PairRDDFunctions, RDD }

case class Foo(k: String, v1: String, v2: String)

implicit def fooToPairRDDFunctions[K, V]
  (rdd: RDD[Foo]): PairRDDFunctions[String, (String, String)] =
    new PairRDDFunctions(
      rdd.map {
        case Foo(k, v1, v2) => k -> (v1, v2)
      }
    )

And then:

scala> val rdd = sc.parallelize(List(Foo("a", "b", "c"), Foo("d", "e", "f")))
rdd: org.apache.spark.rdd.RDD[Foo] = ParallelCollectionRDD[6] at parallelize at <console>:34

scala> rdd.mapValues(_._1).first
res0: (String, String) = (a,b)

The reason your version with Foo extending Tuple2[String, (String, String)] doesn't work is that RDD.rddToPairRDDFunctions targets an RDD[Tuple2[K, V]] and RDD isn't covariant in its type parameter, so an RDD[Foo] isn't a RDD[Tuple2[K, V]]. A simpler example might make this clearer:

case class Box[A](a: A)

class Foo(k: String, v: String) extends Tuple2[String, String](k, v)

class PairBoxFunctions(box: Box[(String, String)]) {
  def pairValue: String = box.a._2
}

implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
  new PairBoxFunctions(box)

And then:

scala> Box(("a", "b")).pairValue
res0: String = b

scala> Box(new Foo("a", "b")).pairValue
<console>:16: error: value pairValue is not a member of Box[Foo]
       Box(new Foo("a", "b")).pairValue
                              ^

But if you make Box covariant…

case class Box[+A](a: A)

class Foo(k: String, v: String) extends Tuple2[String, String](k, v)

class PairBoxFunctions(box: Box[(String, String)]) {
  def pairValue: String = box.a._2
}

implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
  new PairBoxFunctions(box)

…everything's fine:

scala> Box(("a", "b")).pairValue
res0: String = b

scala> Box(new Foo("a", "b")).pairValue
res1: String = b

You can't make RDD covariant, though, so defining your own implicit conversion to add the syntax is your best bet. Personally I'd probably choose to do the conversion explicitly, but this is a relatively un-horrible use of implicit conversions.

Fall answered 20/1, 2016 at 16:45 Comment(0)
B
0

Not sure if I get your question right, but let say you have a case class

import org.apache.spark.rdd.RDD

case class DataFormat(id: Int, name: String, value: Double)
val data: Seq[(Int, String, Double)] = Seq(
   (1, "Joe", 0.1),
   (2, "Mike", 0.3)
)
val rdd: RDD[DataFormat] = (
    sc.parallelize(data).map(x=>DataFormat(x._1, x._2, x._3))
)

// Print all data
rdd.foreach(println)

// Print only names
rdd.map(x=>x.name).foreach(println)
Bernete answered 10/2, 2021 at 12:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.