Spark parquet partitioning : Large number of files
Asked Answered
R

5

64

I am trying to leverage spark partitioning. I was trying to do something like

data.write.partitionBy("key").parquet("/location")

The issue here each partition creates huge number of parquet files which result slow read if I am trying to read from the root directory.

To avoid that I tried

data.coalese(numPart).write.partitionBy("key").parquet("/location")

This however creates numPart number of parquet files in each partition. Now my partition size is different. SO I would ideally like to have separate coalesce per partition. This is however doesn't look like an easy thing. I need to visit all the partition coalesce to a certain number and store at a separate location.

How should I use partitioning to avoid many files after write?

Rewarding answered 28/6, 2017 at 16:49 Comment(2)
have you read #44459855?Vassalize
I think what you are looking for is a way to dynamically scale the number of output files by the size of the data partition. I have a summary of how to accomplish this here, and a complete, self-contained demonstration here.Eaddy
V
95

First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark))

Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing many small files):

data.repartition($"key").write.partitionBy("key").parquet("/location")

If you want to set an arbitrary number of files (or files which have all the same size), you need to further repartition your data using another attribute which could be used (I cannot tell you what this might be in your case):

data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")

another_key could be another attribute of your dataset, or a derived attribute using some modulo or rounding-operations on existing attributes. You could even use window-functions with row_number over key and then round this by something like

data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")

This would put you N records into 1 parquet file

using orderBy

You can also control the number of files without repartitioning by ordering your dataframe accordingly:

data.orderBy($"key").write.partitionBy("key").parquet("/location")

This will lead to a total of (at least, but not much more than) spark.sql.shuffle.partitions files across all partitions (by default 200). It's even beneficial to add a second ordering column after $key, as parquet will remember the ordering of the dataframe and will write the statistics accordingly. For example, you can order by an ID:

data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")

This will not change the number of files, but it will improve the performance when you query your parquet file for a given key and id. See e.g. https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide and https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

Spark 2.2+

From Spark 2.2 on, you can also play with the new option maxRecordsPerFile to limit the number of records per file if you have too large files. You will still get at least N files if you have N partitions, but you can split the file written by 1 partition (task) into smaller chunks:

df.write
.option("maxRecordsPerFile", 10000)
...

See e.g. http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/ and spark write to disk with N files less than N partitions

Vassalize answered 28/6, 2017 at 19:3 Comment(6)
Even after including repartition and partitionBy I still see only one worker saving the parquet file : see #51050772Abb
First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : How to prevent Spark optimization) - Wasn't one of the main points in @viirya 's answer to your question that this does NOT happen?Pigg
Great answer but I'm not sure why you would want to avoid coalesce. The "narrow dependency" of coalesce will avoid a shuffle, which is a good thing, and @Pigg is right, that marked answer from viirya does say that it doesn't get pushed up the chain. It's not good advice for most people in most cases to actively prevent spark optimization, especially by introducing shuffling.Fried
Hmm on second reading it does suggest that the UDF execution happens on fewer nodes due to the coalesce. I still think in many cases the coalesce avoiding shuffling will be beneficial, and you could always use some other stage-separating action upstream like a reduceByKey.Fried
A more thorough answer is at https://mcmap.net/q/271310/-partitioning-a-large-skewed-dataset-in-s3-with-spark-39-s-partitionby-methodTroche
I had to specify the number of partitions explicitly to get this to work. Also, I used floor(rand() * num_files_per_partition) as my second partition field in lieu of a row_id, though a row_id would be great for repeatabilityOniskey
O
13

Let's expand on Raphael Roth's answer with an additional approach that'll create an upper bound on the number of files each partition can contain, as discussed in this answer:

import org.apache.spark.sql.functions.rand

df.repartition(numPartitions, $"some_col", rand)
  .write.partitionBy("some_col")
  .parquet("partitioned_lake")
Orleans answered 21/10, 2019 at 20:56 Comment(0)
E
10

The other answers here are very good but have some problems:

  • Relying on maxRecordsPerFile to break up large partitions into smaller files is very handy but comes with two caveats:

    1. If your partitioning columns are heavily skewed, repartitioning by them means potentially moving all the data for the largest data partition into a single DataFrame partition. If that DataFrame partition gets too large, that alone may crash your job.

      To give a simple example, imagine what repartition("country") would do for a DataFrame that had 1 row for every person in the world.

    2. maxRecordsPerFile will ensure that your output files don't exceed a certain number of rows, but only a single task will be able to write out these files serially. One task will have to work through the entire data partition, instead of being able to write out that large data partition with multiple tasks.

  • repartition(numPartitions, $"some_col", rand) is an elegant solution but does not handle small data partitions well. It will write out numPartitions files for every data partition, even if they are tiny.

    This may not be a problem in many situations, but if you have a large data lake you know that writing out many small files will kill the performance of your data lake over time.

So one solution doesn't play well with very large data partitions, and the other doesn't play well with very small data partitions.

What we need is a way to dynamically scale the number of output files by the size of the data partition. If it's very large, we want many files. If it's very small, we want just a few files, or even just one file.

The solution is to extend the approach using repartition(..., rand) and dynamically scale the range of rand by the desired number of output files for that data partition.

Here's the essence of the solution I posted on a very similar question:

# In this example, `id` is a column in `skewed_data`.
partition_by_columns = ['id']
desired_rows_per_output_file = 10

partition_count = skewed_data.groupBy(partition_by_columns).count()

partition_balanced_data = (
    skewed_data
    .join(partition_count, on=partition_by_columns)
    .withColumn(
        'repartition_seed',
        (
            rand() * partition_count['count'] / desired_rows_per_output_file
        ).cast('int')
    )
    .repartition(*partition_by_columns, 'repartition_seed')
)

This will balance the size of the output files, regardless of partition skew, and without limiting your parallelism or generating too many small files for small partitions.

If you want to run this code yourself, I've provided a self-contained example, along with proof that the DataFrame partitions are being balanced correctly.

Eaddy answered 24/12, 2020 at 4:9 Comment(2)
it is super nice!! thank you @Nick Chammas . If I want to create the partitions sorted by 'id' , what should I do? ie: id is sorted as 1,2,3,4,5 with different amounts of counts. when I run this, repartition_seed is 1000 for 1 and 3 ids, and 2000 for 2 and 4 ids. However, I'd like to have repartition_seed 1000 for 1 and 2, 2000 for 3, and 3000 for 4 and 5 etc.Grime
@Grime - How many rows correspond to each value of id? My solution is meant to balance the number of output files based on how many rows there are. So if id=2 has more rows than id=1, then it will get more output files. That's by design.Eaddy
P
7

This is working for me very well:

data.repartition(n, "key").write.partitionBy("key").parquet("/location")

It produces N files in each output partition (directory), and is (anecdotally) faster than using coalesce and (again, anecdotally, on my data set) faster than only repartitioning on the output.

If you're working with S3, I also recommend doing everything on local drives (Spark does a lot of file creation/rename/deletion during write outs) and once it's all settled use hadoop FileUtil (or just the aws cli) to copy everything over:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
  def copy(
          in : String,
          out : String,
          sparkSession: SparkSession
          ) = {
    FileUtil.copy(
      FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
      new Path(in),
      FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
      new Path(out),
      false,
      sparkSession.sparkContext.hadoopConfiguration
    )
  }

Edit: As per discussion in comments:

You a dataset with a partition column of YEAR, but each given YEAR has vastly different amounts of data in it. So, one year might have 1GB of data, but another might have 100GB.

Here's psuedocode for one way to handle this:

val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
  val subDf = df.filter(s"YEAR = $yearVal")
  val numPartitionsToUse = subDf.count / partitionSize
  subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})

But, I don't actually know what this will work. It's possible that Spark will have an issue reading in a variable number of files per column partition.

Another way to do it would be write your own custom partitioner, but I have no idea what's involved in that so I can't supply any code.

Presidentship answered 4/12, 2018 at 19:38 Comment(10)
@Raphael Roth , thank for quite good info , I have a scenario where my company has data yearly and quarterly wise for last 20 years. As the company growing data grown yoy. So some earlier yearly wise data is in few hundred records , but recent years data is into millions of records . How should/can I partition this data evenly ? so that all parquet files more or less same amount of data/size. Please suggest ...thanksDescombes
Okay, hmm. Let's say twenty years ago you have 10MB of data for the year. Ten years ago you had 10GB, and this year you had 10TB. Let's say you want each partition file to be 100MB.Presidentship
AFAIK, if you partition by a column (say, year) and then into N files, each you end up with D*N files, where D is the number of partitions you get from the column partition. So, if the partition column is a year, and you have twenty years of data, D is twenty.Presidentship
But, you wouldn't then get files of even size over the years, since the 10MB files will go into N smaller files, as will the 10TB. AKA, if N is 10, you'll get 10 1MB files for the really old data, but 10 1TB files for the newest data.Presidentship
You could maybe work something out where you manually partitioned each column partition into different numbers of files; say, split the dataframe into multiple dataframes (one per year) and then write each one out seperately - I think you'd be able to read that in just fine.Presidentship
Oh. Yeah, you'd need to write a custom partitioner that does this. I don't know what would go into that.Presidentship
@ Narfanator, thank you so much , as it has a skewed data , i would like to write one year as on group and partition those into N , each group at a time i.e. saveAsParquet ....how to do it ? in java we can get distinct group id , for loop group ids , select each group data using where condition and save as parquet file , how to do it in spark ?Descombes
The same, only in a different syntax. var someYear = df.filter("year = XXXX"); someYear.repartition(N); someYear.write.parquet("...")Presidentship
I have data as ID , YEAR , QUARTER like 1) 3030 1998 3 2) 3030 1993 3 3) 3030 1995 3 4) 3030 1999 2 5) 3030 1996 2 6) 3030 1996 1 Respective group datafame should be write to parquet file .... currently I have a method like writeAsParquet(df, parquet_file , parquetWriteMode , parquetPartitionColumns) ... how to do this ?Descombes
@ Narfanator we dont know year prior then?Descombes
O
0

These answers are great in theory, but PySpark is seriously broken as far as I can tell. The only thing that seems to work is to use both the number AND the false splitter partition F.floor(F.rand() * num_files_per_partition). Also, setting the number of partitions does nothing. This is on Databricks running on two r4.2xlarge executors on Databricks Runtime 12.2 ML with Apache Spark 3.3.2.


import pandas as pd
import numpy as np

from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit

mk_temp_path = lambda s: os.path.join('s3://<my path>', s)
N = 10000
num_part = 10
num_file = 4

def ls_recursive(path, depth=0):
    fs = dbutils.fs.ls(path)
    maxdepth=2
    if depth > maxdepth:
        print(f"reached max depth > {maxdepth} at {path}")
        return []
    return [
        y
        for x in fs
        for y in (ls_recursive(x.path, depth + 1) if (x.size == 0 and x.path != path) else [x.path])
    ]

df = spark.createDataFrame(pd.DataFrame({"id":np.arange(N), "x":np.random.rand(N)})).withColumn('hp', F.abs(F.hash('id')%num_part))

fn='basic_write'
df.write.mode('overwrite').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in dbutils.fs.ls(mk_temp_path(fn)) if x.path.endswith('parquet')]))

fn='partition_write'
df.write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repartition_col'
df.repartition('hp').write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repartition_num'
df.repartition(num_part*num_file).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repartition_colsplit'
df.repartition(col('hp'), F.floor(F.rand()*lit(num_file))).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repartition_numcol'
df.repartition(num_part*num_file, col('hp')).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repartition_numcolsplit'
df.repartition(num_part*num_file, col('hp'), F.floor(F.rand()*lit(num_file))).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repartition_numcolrand'
df.repartition(num_part*num_file, col('hp'), F.rand()).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))

fn='repart_file_only_BAD'
df.repartition(num_file).write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))



spark.conf.set("spark.sql.shuffle.partitions", 32)
df = spark.createDataFrame(pd.DataFrame({"id":np.arange(N), "x":np.random.rand(N)})).withColumn('hp', F.abs(F.hash('id')%num_part))

fn='basic_write_32'
df.write.mode('overwrite').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in dbutils.fs.ls(mk_temp_path(fn)) if x.path.endswith('parquet')]))

fn='partition_write_32'
df.write.mode('overwrite').partitionBy('hp').parquet(mk_temp_path(fn))
print(f"{fn}:", len([x for x in ls_recursive(mk_temp_path(fn)) if x.endswith('parquet')]))
basic_write: 16
partition_write: 236
repartition_col: 10
repartition_num: 400
repartition_colsplit: 10
repartition_numcol: 10
repartition_numcolsplit: 39
repartition_numcolrand: 400
repart_file_only_BAD: 40
basic_write_32: 16
partition_write_32: 160
Oniskey answered 6/6, 2023 at 23:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.