Count number of rows in an RDD
Asked Answered
A

2

27

I'm using spark with java, and i hava an RDD of 5 millions rows. Is there a sollution that allows me to calculate the number of rows of my RDD. I've tried RDD.count() but it takes a lot of time. I've seen that i can use the function fold. But i didn't found a java documentation of this function. Could you please show me how to use it or show me another solution to get the number of rows of my RDD.

Here is my code :

JavaPairRDD<String, String> lines = getAllCustomers(sc).cache();
JavaPairRDD<String,String> CFIDNotNull = lines.filter(notNull()).cache();
JavaPairRDD<String, Tuple2<String, String>> join =lines.join(CFIDNotNull).cache();

double count_ctid = (double)join.count(); // i want to get the count of these three RDD
double all = (double)lines.count();
double count_cfid = all - CFIDNotNull.count();
System.out.println("********** :"+count_cfid*100/all +"% and now : "+ count_ctid*100/all+"%");

Thank you.

Anastice answered 9/2, 2015 at 15:37 Comment(0)
B
71

You had the right idea: use rdd.count() to count the number of rows. There is no faster way.

I think the question you should have asked is why is rdd.count() so slow?

The answer is that rdd.count() is an "action" — it is an eager operation, because it has to return an actual number. The RDD operations you've performed before count() were "transformations" — they transformed an RDD into another lazily. In effect the transformations were not actually performed, just queued up. When you call count(), you force all the previous lazy operations to be performed. The input files need to be loaded now, map()s and filter()s executed, shuffles performed, etc, until finally we have the data and can say how many rows it has.

Note that if you call count() twice, all this will happen twice. After the count is returned, all the data is discarded! If you want to avoid this, call cache() on the RDD. Then the second call to count() will be fast and also derived RDDs will be faster to calculate. However, in this case the RDD will have to be stored in memory (or disk).

Bharal answered 9/2, 2015 at 15:51 Comment(2)
@Daniel Darabos for profiling the time taken to perform logically different tasks (reading, transforming and writing) in my application, I needed to bypass Spark's lazy evaluation. So I inserted some df.cache.count calls in my code. Can this significantly impact the performance and / or have some other implications? I'm on Spark 2.3.0 and using Scala 2.11.11Unwitnessed
I think it likely has significant performance impact. If you add caching, the data will be stored and retrieved at that point. Even without serialization this is not a trivial overhead. But I also don't know a better way to do what you are looking to do. Your benchmark should still be representative of the time taken by the distinct tasks. Plus you can benchmark against the no-caching version too to see the overall effect of caching.Bharal
S
12

Daniel's explanation of count is right on the money. If you are willing to accept an approximation, though, you could try the countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] RDD method. (Note, though, that this is tagged as "Experimental").

Suiting answered 11/2, 2015 at 15:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.