Spark: Difference between collect(), take() and show() outputs after conversion toDF
Asked Answered
B

2

19

I am using Spark 1.5.

I have a column of 30 ids which I am loading as integers from a database:

val numsRDD = sqlContext
     .table(constants.SOURCE_DB + "." + IDS)
     .select("id")
     .distinct
     .map(row=>row.getInt(0))

This is the output of numsRDD:

numsRDD.collect.foreach(println(_))

643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608

Right next, I want to produce all combinations of 3 for those ids then save each combination as a tuple of the form: < tripletID: String, triplet: Array(Int)> and convert it into a dataframe, which I do as follows:

// |combinationsDF| = 4060 combinations
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","), 
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

As soon as I do that I try to print some of the combinationsDF's contents just to make sure that everything is the way it should be. So I try this:

combinationsDF.show

which returns:

+--------------------+--------------------+
|           tripletID|             triplet|
+--------------------+--------------------+
|,37136455,3758227...|[32776244, 371364...|
|,37136455,3761667...|[32776244, 371364...|
|,32776244,3713645...|[31723487, 327762...|
|,37136455,3757869...|[32776244, 371364...|
|,32776244,3713645...|[31598495, 327762...|
|,37136455,3760089...|[32776244, 371364...|
|,37136455,3764849...|[32776244, 371364...|
|,37136455,3764450...|[32776244, 371364...|
|,37136455,3747130...|[32776244, 371364...|
|,32981901,3713645...|[32776244, 329819...|
|,37136455,3761810...|[32776244, 371364...|
|,34213505,3713645...|[32776244, 342135...|
|,37136455,3726034...|[32776244, 371364...|
|,37136455,3772035...|[32776244, 371364...|
|2776244,37136455...|[643761, 32776244...|
|,37136455,3764777...|[32776244, 371364...|
|,37136455,3760898...|[32776244, 371364...|
|,32879386,3713645...|[32776244, 328793...|
|,32776244,3713645...|[31104189, 327762...|
|,32776244,3713645...|[30736590, 327762...|
+--------------------+--------------------+
only showing top 20 rows

As it is evident, the first element of every tripletID is missing. So, just to be 100% sure I use take(20) as follows:

combinationsDF.take(20).foreach(println(_))

which returns a more detailed representation as per below:

[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[2776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]

So now I am sure that the first id from tripletID is somehow for whatever reason deprecated. But still, if I try to use collect instead of take(20):

combinationsDF.collect.foreach(println(_))

everything goes back to being fine again (!!!):

[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
...

1. I have exhaustively queried the steps just before I parallelize the array of combinations into an RDD and everything is ok. 2. I have also printed the output right after parallelize is applied and again everything is ok. 3. The problem appears to be related with the conversion of the numsRDD to a DF and despite my best efforts I cannot deal with it. 4. I was also incapable of reproducing the problem with mock data using the same code snippet.

So first: What's causing this problem? and second: How do I fix it?

Bang answered 6/12, 2016 at 16:31 Comment(3)
It would be very helpful if you create a minimal reproducible example of this problem for us to run locally.Cardiomegaly
Ok will try so asap. Problem is that I was not able to reproduce the problem like that in a separate project...Bang
I added the output of the original input so that should suffice for reproducing the problem I guess.Bang
K
2

I would check your original numsRDD, it looks like you might have an empty string or null value in there. This works for me:

scala> val numsRDD = sc.parallelize(0 to 30)
numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> :pa
// Entering paste mode (ctrl-D to finish)

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","),
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

// Exiting paste mode, now interpreting.

combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows

The only other thing I can think of is mkString not working like you would expect. Try out this string interpolation (also no need to recreate the List):

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map{case List(a,b,c) => (
        s"$a,$b,$c", 
        Array(a,b,c))}
  .toDF("tripletID","triplet")

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows
Killjoy answered 6/12, 2016 at 16:58 Comment(12)
Thanks but I did and it is as should be. Plus note that when I use collect the problem "goes away".Bang
Hmm, I'll keep digging. could you provide the foreach{println} on numsRDD?Killjoy
Will do asap - currently commuting. :-)Bang
I updated the problem description to include what you asked for. Can you reproduce the problem? or better do you have any problems at all when trying my code? If not maybe I should really look into the original table...Bang
Is that a blank line at the beginning? Try this instead: numsRDD.foreach(x => println(s"Value is: $x"))Killjoy
Unfortunately, I can't try that from home (just left work). I need to be at the company to access the cluster. Wee will have to wait. I did .count the items however and they are 30. I checked the contents of the table on the server as well and they seem ok... I will be back to work on Monday unfortunately when I will be able to try this again.Bang
Ha, sounds good. I guess we will all have to wait in suspense until then.Killjoy
Tried it again today and it is definitely the case that I have 30 not empty not null not "" nor " " items. I really don't know what to make of this...Bang
Hmm, yeah I am unable to replicate it then so I am not sure what the issue isKilljoy
Very weird man... Will try to see it through and post here again if I find out what is causing it.Bang
Are you still able to replicate it?Killjoy
Yes - same issue appears all the time. Difference between take and collect persistsBang
S
9
  1. df.show() shows only content.

e.g.

df.show()
Out[11]: 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
  1. df.collect() shows content and structure/metadata.

e.g.

df.collect()
Out[11]:
[Row(age=None, name=u'Michael'),
Row(age=30, name=u'Andy'),
Row(age=19, name=u'Justin')]
  1. df.take(some number) can be used to shows content and structure/metadata for a limited number of rows for a very large dataset. note it flattens out the data and shows in a single row.

e.g. to see only first two rows of the dataframe

df.take(2)
Out[13]: 
[Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy')]
Shend answered 17/11, 2017 at 3:12 Comment(0)
K
2

I would check your original numsRDD, it looks like you might have an empty string or null value in there. This works for me:

scala> val numsRDD = sc.parallelize(0 to 30)
numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> :pa
// Entering paste mode (ctrl-D to finish)

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","),
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

// Exiting paste mode, now interpreting.

combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows

The only other thing I can think of is mkString not working like you would expect. Try out this string interpolation (also no need to recreate the List):

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map{case List(a,b,c) => (
        s"$a,$b,$c", 
        Array(a,b,c))}
  .toDF("tripletID","triplet")

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows
Killjoy answered 6/12, 2016 at 16:58 Comment(12)
Thanks but I did and it is as should be. Plus note that when I use collect the problem "goes away".Bang
Hmm, I'll keep digging. could you provide the foreach{println} on numsRDD?Killjoy
Will do asap - currently commuting. :-)Bang
I updated the problem description to include what you asked for. Can you reproduce the problem? or better do you have any problems at all when trying my code? If not maybe I should really look into the original table...Bang
Is that a blank line at the beginning? Try this instead: numsRDD.foreach(x => println(s"Value is: $x"))Killjoy
Unfortunately, I can't try that from home (just left work). I need to be at the company to access the cluster. Wee will have to wait. I did .count the items however and they are 30. I checked the contents of the table on the server as well and they seem ok... I will be back to work on Monday unfortunately when I will be able to try this again.Bang
Ha, sounds good. I guess we will all have to wait in suspense until then.Killjoy
Tried it again today and it is definitely the case that I have 30 not empty not null not "" nor " " items. I really don't know what to make of this...Bang
Hmm, yeah I am unable to replicate it then so I am not sure what the issue isKilljoy
Very weird man... Will try to see it through and post here again if I find out what is causing it.Bang
Are you still able to replicate it?Killjoy
Yes - same issue appears all the time. Difference between take and collect persistsBang

© 2022 - 2024 — McMap. All rights reserved.