Spark DataFrame limit function takes too much time to show
Asked Answered
R

3

10
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import findspark
from pyspark.sql.functions import countDistinct
spark = SparkSession.builder \
.master("local[*]") \
.appName("usres mobile related information analysis") \
.config("spark.submit.deployMode", "client") \
.config("spark.executor.memory","3g") \
.config("spark.driver.maxResultSize", "1g") \
.config("spark.executor.pyspark.memory","3g") \
.enableHiveSupport() \
.getOrCreate()

handset_info = ora_tmp.select('some_value','some_value','some_value','some_value','some_value','some_value','some_value')

I configure the spark with 3gb execution memory and 3gb execution pyspark memory.My Database has more than 70 Million row. Show i call the

 handset_info.show()

method it is showing the top 20 row in between 2-5 second. But when i try to run the following code

mobile_info_df = handset_info.limit(30)
mobile_info_df.show()

to show the top 30 rows the it takes too much time(3-4 hour). Is it logical to take that much time. Is there any problem in my configuration. Configuration of my laptop is-

  • Core i7(4 core) laptop with 8gb ram
Roundfaced answered 10/2, 2019 at 9:49 Comment(0)
F
5

Your configuration is fine. This huge duration difference is caused by underlying implementation. The difference is that limit() reads all of the 70 million rows before it creates a dataframe with 30 rows. Show() in contrast just takes the first 20 rows of the existing dataframe and has therefore only to read this 20 rows. In case you are just interessted in showing 30 instead of 20 rows, you can call the show() method with 30 as parameter:

df.show(30, truncate=False)
Forestall answered 13/3, 2019 at 0:2 Comment(4)
Do you know why does limit() works that way? It strikes me as rather wasteful...Miscalculate
What you said doesn't seem to be right, see: github.com/apache/spark/pull/15070Miscalculate
@Miscalculate I don't see how this PR is related to the OP's scenario. The OP creates two dataframes mobile_info_df and handset_info. The first one should only contain 30 rows and the other one is not limited. That means the whole data needs to be read (and the show triggers the reading). I can not test it right now because I don't have a spark environment at the moment, but mobile_info_df = handset_info.limit(30) and handset_info = handset_info.limit(30) will probably lead to a different execution plan. 1/2Forestall
@Miscalculate The latter will have a much better performance similar to show (as I said probably! I have not tested it.). Maybe you can check it by yourself and edit my answer to improve it. 2/2Forestall
M
10

Spark copies the parameter you passed to limit() to each partition so, in your case, it tries to read 30 rows per partition. I guess you happened to have a huge number of partitions (which is not good in any case). Try df.coalesce(1).limit(30).show() and it should run as fast as df.show().

Miscalculate answered 26/12, 2020 at 9:41 Comment(3)
I am not sure. Spark is usually use for big data, which can easily having 1000 of partitions for parallelization. It is not that useful if you restrict it to single partition (which also means single core). In that case, you might just use pandas.Marandamarasca
But, the .coalesce(1).limit(30) does work much faster! Thanks!Marandamarasca
The coalesce() call only affect the planning of the subsequent calls; it doesn't mean your data is immediately put together into a big block ;-)Miscalculate
F
5

Your configuration is fine. This huge duration difference is caused by underlying implementation. The difference is that limit() reads all of the 70 million rows before it creates a dataframe with 30 rows. Show() in contrast just takes the first 20 rows of the existing dataframe and has therefore only to read this 20 rows. In case you are just interessted in showing 30 instead of 20 rows, you can call the show() method with 30 as parameter:

df.show(30, truncate=False)
Forestall answered 13/3, 2019 at 0:2 Comment(4)
Do you know why does limit() works that way? It strikes me as rather wasteful...Miscalculate
What you said doesn't seem to be right, see: github.com/apache/spark/pull/15070Miscalculate
@Miscalculate I don't see how this PR is related to the OP's scenario. The OP creates two dataframes mobile_info_df and handset_info. The first one should only contain 30 rows and the other one is not limited. That means the whole data needs to be read (and the show triggers the reading). I can not test it right now because I don't have a spark environment at the moment, but mobile_info_df = handset_info.limit(30) and handset_info = handset_info.limit(30) will probably lead to a different execution plan. 1/2Forestall
@Miscalculate The latter will have a much better performance similar to show (as I said probably! I have not tested it.). Maybe you can check it by yourself and edit my answer to improve it. 2/2Forestall
D
3

As you've already experienced, limit() with large data has just terrible performance. Wanted to share a workaround for anyone else with this problem. If the limit count doesn't have to be exact, use sort() or orderBy() to sort a column, and use filter() to grab top k% of the rows.

Detail answered 28/7, 2020 at 21:57 Comment(2)
Could you please provide an example showing how to use sort() + filter() to grab top 10% rows?Yvetteyvon
@Yvetteyvon So this wouldn't get you exactly x% because who knows how many rows meet the filter() condition, but you would do something like: df.sort("score").filter("score > 0.9") If you have some knowledge over your data distribution.. you could get close to your desired X% or number of rows...Detail

© 2022 - 2024 — McMap. All rights reserved.