pyspark: Efficiently have partitionBy write to same number of total partitions as original table
Asked Answered
J

2

41

I had a question that is related to pyspark's repartitionBy() function which I originally posted in a comment on this question. I was asked to post it as a separate question, so here it is:

I understand that df.partitionBy(COL) will write all the rows with each value of COL to their own folder, and that each folder will (assuming the rows were previously distributed across all the partitions by some other key) have roughly the same number of files as were previously in the entire table. I find this behavior annoying. If I have a large table with 500 partitions, and I use partitionBy(COL) on some attribute columns, I now have for example 100 folders which each contain 500 (now very small) files.

What I would like is the partitionBy(COL) behavior, but with roughly the same file size and number of files as I had originally.

As demonstration, the previous question shares a toy example where you have a table with 10 partitions and do partitionBy(dayOfWeek) and now you have 70 files because there are 10 in each folder. I would want ~10 files, one for each day, and maybe 2 or 3 for days that have more data.

Can this be easily accomplished? Something like df.write().repartition(COL).partitionBy(COL) seems like it might work, but I worry that (in the case of a very large table which is about to be partitioned into many folders) having to first combine it to some small number of partitions before doing the partitionBy(COL) seems like a bad idea.

Any suggestions are greatly appreciated!

Jolynnjon answered 9/6, 2018 at 15:35 Comment(0)
M
46

You've got several options. In my code below I'll assume you want to write in parquet, but of course you can change that.

(1) df.repartition(numPartitions, *cols).write.partitionBy(*cols).parquet(writePath)

This will first use hash-based partitioning to ensure that a limited number of values from COL make their way into each partition. Depending on the value you choose for numPartitions, some partitions may be empty while others may be crowded with values -- for anyone not sure why, read this. Then, when you call partitionBy on the DataFrameWriter, each unique value in each partition will be placed in its own individual file.

Warning: this approach can lead to lopsided partition sizes and lopsided task execution times. This happens when values in your column are associated with many rows (e.g., a city column -- the file for New York City might have lots of rows), whereas other values are less numerous (e.g., values for small towns).

(2) df.sort(sortCols).write.parquet(writePath)

This options works great when you want (1) the files you write to be of nearly equal sizes (2) exact control over the number of files written. This approach first globally sorts your data and then finds splits that break up the data into k evenly-sized partitions, where k is specified in the spark config spark.sql.shuffle.partitions. This means that all values with the same values of your sort key are adjacent to each other, but sometimes they'll span a split, and be in different files. This, if your use-case requires all rows with the same key to be in the same partition, then don't use this approach.

There are two extra bonuses: (1) by sorting your data its size on disk can often be reduced (e.g., sorting all events by user_id and then by time will lead to lots of repetition in column values, which aids compression) and (2) if you write to a file format the supports it (like Parquet) then subsequent readers can read data in optimally by using predicate push-down, because the parquet writer will write the MAX and MIN values of each column in the metadata, allowing the reader to skip rows if the query specifies values outside of the partition's (min, max) range.

Note that sorting in Spark is more expensive than just repartitioning and requires an extra stage. Behind the scenes Spark will first determine the splits in one stage, and then shuffle the data into those splits in another stage.

(3) df.rdd.partitionBy(customPartitioner).toDF().write.parquet(writePath)

If you're using spark on Scala, then you can write a customer partitioner, which can get over the annoying gotchas of the hash-based partitioner. Not an option in pySpark, unfortunately. If you really want to write a custom partitioner in pySpark, I've found this is possible, albeit a bit awkward, by using rdd.repartitionAndSortWithinPartitions:

df.rdd \
  .keyBy(sort_key_function) \  # Convert to key-value pairs
  .repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS, 
                                      partitionFunc=part_func) \
  .values() # get rid of keys \
.toDF().write.parquet(writePath)

Maybe someone else knows an easier way to use a custom partitioner on a dataframe in pyspark?

Mount answered 12/6, 2018 at 8:29 Comment(16)
Thank you @Mount ! A few questions about option #2. I still want it to save in folders with the COL values in the names, so can I just do df.sort(COL).write.partitionBy(COL).parquet(writePath)? If so, am I right that this would create different numbers of (roughly equally sized) files in each folder, based on how many rows have that value? One more question: If I do saveAsTable('tblnm') instead of parquet('path') will that have the same behavior (assuming my saveAsTable defaults to parquet, which it does)?Jolynnjon
@Jolynnjon Whenever you call DataFrameWriter.partitionBy(col) your partition/file sizes will be determined by the properties of the data in col, so you'll often end up with skewed file sizes. So, if you require folders with COL values in the names, and COL has some values that are much more frequent than others, then there's no way around having skewed file sizes. Why is that a requirement? Is it for efficient loading? If so, read my note about predicate pushdown on option (2) again.Mount
@Jolynnjon as for your second question about saveAsTable, read about the differences here. If you use saveAsTable you have even more options: you can first partition on one set of columns (such as date) and then bucket based on another set (such as user id), and you can choose to sort within buckets (which is cheaper than a global sort). Your table will be saved to HDFS at the default warehouse directory, which is configurable as a spark option.Mount
about the skewed file sizes and the named folders: it's kind of a weird use case where I'm using Spark for ETL and then using pandas.read_parquet() (on a different server) combined with s3fs to batch through pulling the files into memory and each batch is all the files with a given value of col. So if they're in different folders, organized by col, then it makes it much easier to do that. Like I said, ideally each folder would have an arbitrary number of similarly sized files, but I realize that may be hard/impossible to achieve.Jolynnjon
Is it feasible to divide the dataframe into multiple dfs (one df per each value for a column, change the number of partitions per each dataset and write them separately? This way you can control the number of files in each final folder.Suspicious
Yes @SinanErdem that's definitely possible. Your driver will have to loop through each individual value of the column though and create a new dataframe and job for each. So if you're not sure it'll always be a manageable number of values in your column to loop through, it's risky.Mount
What about doing a repartition(x) followed by a sortWithinPartitions(cols) because the repartition will give you the number of files and sortWithinPartitions will keep the data sorted. Will predicate pushdown still work if you write this to Parquet?Nasal
@Nasal The answer is basically, 'no, it won't work very well'. Predicate pushdown on a column c works best when each parquet file contains a small range of the entire set of values of c. Thus, partitioning the dataframe directly after performing a global sort on c will allow predicate pushdown to work best, because each partition contains a small range of values in c. If you instead call df.repartition(numCols).sortWithinPartitions(cols), that will randomly shuffle values of c among all partitions, so each partition will contain a large range of values of `c.Mount
@conradlee, that makes a lot of sense, thank you very much for answering my question! I was wondering if it's possible to control the number of files with the approach in (2). This was primarily the reason of doing a repartitionNasal
df.repartition(col).write.partitionBy(numPartitions, cols).parquet(writePath) I get an error using numPartitions in this way, what am I missing?Geometer
@Geometer - Sorry there was a mistake in that code. You choose the numPartitions in the repartition function. the partitionBy function (a method of the DataFrameWriter) doesn't accept a numPartitions argument. I've corrected the code in my answer--thanks for pointing this bug out!Mount
@Nasal yes to control the number of files in (2), the 'global sort approach', you set the spark.sql.shuffle.partitions confuguration parameter to the number of partitions you want.Mount
OK, cool, thx I thought I was losing it. But I have second point: The file per partitionBy I assume is one max with N blocks underwater. But someone told me no you can get more than 1 file. But I never see that. Your thoughts?Geometer
Thanks @conradlee, that is exactly what I neededNasal
@Geometer I think your question is 'How does spark's DataframeWriter.partitionBy option determine the number of files to write'? Can you write that as a stand-alone question in StackOverflow? I think it's too involved to go over that here in the comments.Mount
Yes indeed. But I came accross soneone who only wanted 1 with low cardinality. That is clear to me and I felt it was a bad thing given small files problem in HDFS.Geometer
A
3
df.repartition(COL).write().partitionBy(COL)

will write out one file per partition. This will not work well if one of your partition contains a lot of data. e.g. if one partition contains 100GB of data, Spark will try to write out a 100GB file and your job will probably blow up.

df.repartition(2, COL).write().partitionBy(COL)

will write out a maximum of two files per partition, as described in this answer. This approach works well for datasets that are not very skewed (because the optimal number of files per partition is roughly the same for all partitions).

This answer explains how to write out more files for the partitions that have a lot of data and fewer files for the small partitions.

Anthropolatry answered 20/10, 2019 at 5:48 Comment(1)
.repartition(2, COL) should be before .write() not after.Maurice

© 2022 - 2024 — McMap. All rights reserved.