Spark - repartition() vs coalesce()
Asked Answered
L

20

422

According to Learning Spark

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased.

If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement?

Lining answered 24/7, 2015 at 12:49 Comment(1)
I want to use coalesce() however how do you know beforehand the dataframe you are going to operate coalesce(100) has more than 100 partitions. Also if the code does not change it will pyspark before coalesce() will always have fixed number of partitions? Could you provide reference if there is one?Pinette
D
522

It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.

So, it would go something like this:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Then coalesce down to 2 partitions:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Notice that Node 1 and Node 3 did not require its original data to move.

Dingbat answered 24/7, 2015 at 14:13 Comment(14)
Thanks for the response. The documentation should have better said minimize data movement instead of avoiding data movement.Lining
Is there any case when repartition should be use instead of coalesce?Mascara
@Niemand I think the current documentation covers this pretty well: github.com/apache/spark/blob/… Keep in mind that all repartition does is call coalesce with the shuffle parameter set to true. Let me know if that helps.Dingbat
Is it possible to reduce number of partition files that are existing ? I have no hdfs, but problem with many files.Deluge
@JustinPihony, Thanks for the good answer. In example you given in answer has 12 partitions so will repartition(2) is equal to coalesce(2) ? or repartition(2) will be slower than coalesce(2) ?Though
repartition will be statistically slower since it doesn't know that it is shrinking...although maybe they could optimize that. Internally it just calls coalesce with a shuffle = true flagDingbat
@Niemand - Yes, I have production code that runs faster with repartition than coalesce. repartition divides the data equally, so it's faster to output to files and run analyses. See my answer for more details.Therm
So would coalesce be considered a narrow or a wide transformation?Abundant
Neither, it's not a transformation at all - it just deals with where the data is located @AbundantDingbat
@JustinPihony It creates a new RDD, doesn't it? If so, that's the definition of a transformation...Abundant
Fair enough - I had recalled it being inline also, but reviewing the source - the docs even say it's narrow - github.com/apache/spark/blob/master/core/src/main/scala/org/…Dingbat
do we have a more optimized version of coalesce?Postgraduate
@Mascara - repartition may also be desired during a job if the partitions are skewed and you want the next stage tasks to run on partitions with similar size.Alexanderalexandr
So, the coalesce can't shuffle data?Innocence
T
321

Justin's answer is awesome and this response goes into more depth.

The repartition algorithm does a full shuffle and creates new partitions with data that's distributed evenly. Let's create a DataFrame with the numbers from 1 to 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf contains 4 partitions on my machine.

numbersDf.rdd.partitions.size // => 4

Here is how the data is divided on the partitions:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Let's do a full-shuffle with the repartition method and get this data on two nodes.

val numbersDfR = numbersDf.repartition(2)

Here is how the numbersDfR data is partitioned on my machine:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).

Difference between coalesce and repartition

coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

Is coalesce or repartition faster?

coalesce may run faster than repartition, but unequal sized partitions are generally slower to work with than equal sized partitions. You'll usually need to repartition datasets after filtering a large data set. I've found repartition to be faster overall because Spark is built to work with equal sized partitions.

N.B. I've curiously observed that repartition can increase the size of data on disk. Make sure to run tests when you're using repartition / coalesce on large datasets.

Read this blog post if you'd like even more details.

When you'll use coalesce & repartition in practice

Therm answered 5/12, 2016 at 20:54 Comment(16)
Great answer @Powers, but isn't the data in Partition A and B skewed? How is it evenly distributed?Horrocks
Also, what's the best way to get the partition size without getting OOM error. I use rdd.glom().map(len).collect() but it gives lot of OOM errors.Horrocks
@Horrocks - Partition A and Partition B are different sizes because the repartition algorithm doesn't distribute data as equally for very small data sets. I used repartition to organize 5 million records into 13 partitions and each file was between 89.3 MB and 89.6 MB - that's pretty equal!Therm
What about the other question in my second comment? :)Horrocks
@anwartheravian- I unfortunately don't know the answer to your second comment. I'd love to know the answer! Let me know if you ever find the answer :)Therm
Found a better way hereHorrocks
Partition A: 1, 3, 4, 6, 7, 9, 10, 12 and Partition B: 2, 5, 8, 11 isn't much evenly distributed ... is it ?Cranwell
@ShariqueAbdullah- It's not too evenly distributed cause the data set used in this example is tiny. It's pretty even for large data sets ;)Therm
my final spark dataframe is outcome of lot of joins , so the output of hdfs has large number of empty and small number of files..So repartition(1) or coalesce(1) which would be fasterFenn
@Fenn - repartition(1) and coalesce(1) should have similar performance.Therm
Should we really use these repartition(1) and coalesce(1) , will it be a major performance issueFenn
What will happen if I call coalesce(1) or repartition(1) on a RDD of 100 partitions distributed among multiple nodes? WIll Coalesce not Shuffle the data to single Node?Masterpiece
I just replaced coalesce(1) with repartition(1), and reduced my run from 4 hours and a half to 18 minutes. This was for a result of JOIN of a huge CSV, with a smallish CSV. Most of these 18 minutes were 16 minutes required to read the big CSV, so the difference between coalesce(1) with repartition(1) is even bigger (2 minutes vs 4 hours and 15 minutes)Hapsburg
coalesce(1) should never be used because it cannot scale beyond a single thread/node. repartition(1) will cause a shuffle and have more steps, but coalesce(1) is always processed on a single node, so with a large dataset that means performance absolutely tanks.Brotherinlaw
How can I check the partitions that each element ended up in? I was trying to copy the code in my spark-shell but I couldn't figure out how to do it. I'm looking for something like this Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11Archery
is number of partition of numbersDf fixed? So each time of running same code will result in same number of partitions?Pinette
R
51

repartition - it's recommended to use it while increasing the number of partitions, because it involve shuffling of all the data.

coalesce - it's recommended to use it while reducing the number of partitions. For example if you have 3 partitions and you want to reduce it to 2, coalesce will move the 3rd partition data to partition 1 and 2. Partition 1 and 2 will remains in the same container. On the other hand, repartition will shuffle data in all the partitions, therefore the network usage between the executors will be high and it will impacts the performance.

coalesce performs better than repartition while reducing the number of partitions.

Rasorial answered 31/8, 2018 at 7:14 Comment(3)
Useful Explanation.Bromoform
@Kamalesan C - very good explanation in simple words, I wish I could upvote this answer more than once.Gateshead
You didn't mention: Spark’s will effectively push down the coalesce operation to as early a point as possible. This means it can affect parallelism (decrease) in earlier operations.Franconia
J
36

One additional point to note here is that, as the basic principle of Spark RDD is immutability. The repartition or coalesce will create new RDD. The base RDD will continue to have existence with its original number of partitions. In case the use case demands to persist RDD in cache, then the same has to be done for the newly created RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2
Josettejosey answered 21/8, 2016 at 15:44 Comment(3)
nice one! this is critical and at least to this experienced scala dev, not obvious--ie, neither repartition nor coalesce attempt to modify the data, just how it is distributed across nodesFlor
@Harikrishnan so if I understood the other answers properly then as per them in case of coalesce Spark uses existing partitions however as RDD is immutable can you describe how Coalesce make use of existing partitions? As per my understanding I thought Spark appends new partitions to the existing partitions in coalesce.Sommersommers
But if the "old" RDD is not used anymore as is known by the execution graph it will be cleared from memory if not persisted, won't it?Chlamydeous
P
21

There is a use-case for repartition >> coalesce even where the partition number decreases mentioned in @Rob's answer, that is writing data to a single file.

@Rob's answer hints in the good direction, but I think that some further explanation is needed to understand what's going on under the hood.

If you need to filter your data before writing, then repartition is much more suitable than coalesce, since coalesce will be pushed-down right before the loading operation.

For instance: load().map(…).filter(…).coalesce(1).save()

translates to: load().coalesce(1).map(…).filter(…).save()

This means that all your data will collapse into a single partition, where it will be filtered, losing all parallelism. This happens even for very simple filters like column='value'.

This does not happen with repartition: load().map(…).filter(…).repartition(1).save()

In such case, filtering happens in parallel on the original partitions.

Just to give an order of magnitude, in my case when filtering 109M rows (~105G) with ~1000 partitions after loading from a Hive table, the runtime dropped from the ~6h for coalesce(1) to ~2m for repartition(1).

The specific example is taken from this article from AirBnB, which is pretty good and covers even more aspects of repartitioning techniques in Spark.

Planimeter answered 27/11, 2020 at 13:25 Comment(2)
I tried this on CSV file. spark v 2.4.5, but I could see proper sequence on DAG. coalesce in not coming first. Could you please add some more details like DAG or physical plan to show coalesce will be pushed down first.Ferris
found exactly same thing happening to me due to filtersWolver
R
19

What follows from the code and code docs is that coalesce(n) is the same as coalesce(n, shuffle = false) and repartition(n) is the same as coalesce(n, shuffle = true)

Thus, both coalesce and repartition can be used to increase number of partitions

With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large.

Another important note to accentuate is that if you drastically decrease number of partitions you should consider using shuffled version of coalesce (same as repartition in that case). This will allow your computations be performed in parallel on parent partitions (multiple task).

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

Please also refer to the related answer here

Rohrer answered 19/7, 2019 at 12:20 Comment(0)
D
16

All the answers are adding some great knowledge into this very often asked question.

So going by tradition of this question's timeline, here are my 2 cents.

I found the repartition to be faster than coalesce, in very specific case.

In my application when the number of files that we estimate is lower than the certain threshold, repartition works faster.

Here is what I mean

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

In above snippet, if my files were less than 20, coalesce was taking forever to finish while repartition was much faster and so the above code.

Of course, this number (20) will depend on the number of workers and amount of data.

Hope that helps.

Dispensable answered 21/6, 2017 at 19:53 Comment(1)
rather than setting it to a hard number like 20, it likely makes sense to compare the number of files to the number of nodes in the cluster. You can get the number of executors with this line of code: sc._jsc.sc().getExecutorMemoryStatus().size() Where sc is a pyspark SparkContext object. If you're in scala or java it's even simpler: sc.getExecutorMemoryStatus().size()Brotherinlaw
U
11

Repartition: Shuffle the data into a NEW number of partitions.

Eg. Initial data frame is partitioned in 200 partitions.

df.repartition(500): Data will be shuffled from 200 partitions to new 500 partitions.

Coalesce: Shuffle the data into existing number of partitions.

df.coalesce(5): Data will be shuffled from remaining 195 partitions to 5 existing partitions.

Uneventful answered 26/9, 2019 at 7:13 Comment(0)
J
8

I would like to add to Justin and Power's answer that -

repartition will ignore existing partitions and create new ones. So you can use it to fix data skew. You can mention partition keys to define the distribution. Data skew is one of the biggest problems in the 'big data' problem space.

coalesce will work with existing partitions and shuffle a subset of them. It can't fix the data skew as much as repartition does. Therefore even if it is less expensive it might not be the thing you need.

Jackanapes answered 7/2, 2019 at 18:8 Comment(0)
I
8

Basically Repartition allows you to increase or decrease the number of partitions. Repartition re-distributes the data from all the partitions and this leads to full shuffle which is very expensive operation.

Coalesce is the optimized version of Repartition where you can only reduce the number of partitions. As we are only able to reduce the number of partitions what it does is merge some of the partitions to be a single partition. By merging partitions, the movement of the data across the partition is lower compared to Repartition. So in Coalesce is minimum data movement but saying that coalesce does not do data movement is completely wrong statement.

Other thing is in repartition by providing the number of partitions, it tries to redistribute the data uniformly on all the partitions while in case of Coalesce we could still have skew data in some cases.

Incommode answered 5/2, 2021 at 6:47 Comment(0)
C
7

To all the great answers I would like to add that repartition is one the best option to take advantage of data parallelization. While coalesce gives a cheap option to reduce the partitions and it is very useful when writing data to HDFS or some other sink to take advantage of big writes.

I have found this useful when writing data in parquet format to get full advantage.

Club answered 15/2, 2019 at 14:27 Comment(0)
R
5

For someone who had issues generating a single csv file from PySpark (AWS EMR) as an output and saving it on s3, using repartition helped. The reason being, coalesce cannot do a full shuffle, but repartition can. Essentially, you can increase or decrease the number of partitions using repartition, but can only decrease the number of partitions (but not 1) using coalesce. Here is the code for anyone who is trying to write a csv from AWS EMR to s3:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')
Radium answered 29/5, 2019 at 20:52 Comment(0)
L
5
  • Coalesce uses existing partitions to minimize the amount of data that are shuffled. Repartition creates new partitions and does a full shuffle.

  • Coalesce results in partitions with different amounts of data (sometimes partitions that have many different sizes) and repartition results in roughly equal-sized partitions.

  • Coalesce we can decrease the partitions but reparation we can used to both increase and decrease the partitions.

Liana answered 26/3, 2021 at 16:40 Comment(0)
F
3

Here is some additional details/differences on code level:

Adding only function definitions here, for full code implementation check spark's github page.

Below are the different methods available for repartition on dataframe: check full implementation here.

def repartition(numPartitions: Int): Dataset[T]

Whenever we call above method on dataframe it returns a new Dataset that has exactly numPartitions partitions.

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

Above method returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is hash partitioned.

 def repartition(partitionExprs: Column*): Dataset[T]

Above method returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.

def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

Above method returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is range partitioned.

def repartitionByRange(partitionExprs: Column*): Dataset[T]

Above method returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is range partitioned.

But for coalesce we have only below method on dataframe:

def coalesce(numPartitions: Int): Dataset[T] 

Above method will return a new Dataset that has exactly numPartitions partitions

Below are the methods available for repartition and coalesce on RDD: check full implementation here.

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)

}

Basically, repartition method calls coalesce method by passing shuffle value as true. Now if we use coalesce method on RDD by passing shuffle value as true we can increase partitions too!.

Ferris answered 13/6, 2022 at 12:28 Comment(0)
S
2

But also you should make sure that, the data which is coming coalesce nodes should have highly configured, if you are dealing with huge data. Because all the data will be loaded to those nodes, may lead memory exception. Though reparation is costly, i prefer to use it. Since it shuffles and distribute the data equally.

Be wise to select between coalesce and repartition.

Scenario answered 30/8, 2018 at 13:10 Comment(0)
K
2

The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.

Coalesce works well for taking an RDD with a lot of partitions and combining partitions on a single worker node to produce a final RDD with less partitions.

Repartition will reshuffle the data in your RDD to produce the final number of partitions you request. The partitioning of DataFrames seems like a low level implementation detail that should be managed by the framework, but it’s not. When filtering large DataFrames into smaller ones, you should almost always repartition the data. You’ll probably be filtering large DataFrames into smaller ones frequently, so get used to repartitioning.

Read this blog post if you'd like even more details.

Kiel answered 16/5, 2020 at 14:55 Comment(0)
F
1

In a simple way COALESCE :- is only for decreases the no of partitions , No shuffling of data it just compress the partitions

REPARTITION:- is for both increase and decrease the no of partitions , But shuffling takes place

Example:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Both works fine

But we go generally for this two things when we need to see output in one cluster,we go with this.

Fulbert answered 24/8, 2017 at 6:46 Comment(1)
There will be movement of data with Coalese as well.Invariable
S
0

Also another difference is taking into consideration a situation where there is a skew join and you have to coalesce on top of it. A repartition will solve the skew join in most cases, then you can do the coalesce.

Another situation is, suppose you have saved a medium/large volume of data in a data frame and you have to produce to Kafka in batches. A repartition helps to collectasList before producing to Kafka in certain cases. But, when the volume is really high, the repartition will likely cause serious performance impact. In that case, producing to Kafka directly from dataframe would help.

side notes: Coalesce does not avoid data movement as in full data movement between workers. It does reduce the number of shuffles happening though. I think that's what the book means.

Shaikh answered 4/11, 2020 at 19:4 Comment(0)
Q
-1

Coalesce perform better than repartition. Coalesce always decrease the partition. Let suppose if you enable dynamic allocation in yarn , you have four partition and executor. If filter applied on it, than possible of one or more executor is empty having no data. This is problem can solved by coalesce rather than repartition.

Quadratics answered 11/7, 2021 at 19:2 Comment(0)
H
-3

Coalesce -- can increase or decrease the partition repartition -- can only increase the partition

But I would say performance is purely based on use case . Not always coalesce is better than repartition.

Hospers answered 11/7, 2022 at 6:23 Comment(1)
this is absolutely wrong and should be deletedMayday

© 2022 - 2024 — McMap. All rights reserved.