Spark dataframe write method writing many small files
Asked Answered
A

7

23

I've got a fairly simple job coverting log files to parquet. It's processing 1.1TB of data (chunked into 64MB - 128MB files - our block size is 128MB), which is approx 12 thousand files.

Job works as follows:

 val events = spark.sparkContext
  .textFile(s"$stream/$sourcetype")
  .map(_.split(" \\|\\| ").toList)
  .collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
  .toDF()

df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")

It collects the events with a common schema, converts to a DataFrame, and then writes out as parquet.

The problem I'm having is that this can create a bit of an IO explosion on the HDFS cluster, as it's trying to create so many tiny files.

Ideally I want to create only a handful of parquet files within the partition 'date'.

What would be the best way to control this? Is it by using 'coalesce()'?

How will that effect the amount of files created in a given partition? Is it dependent on how many executors I have working in Spark? (currently set at 100).

Asphyxia answered 9/6, 2017 at 13:40 Comment(3)
not related to the question, but you should not collect your data (first statement), rather use map on your RDDFlourish
@RaphaelRoth this collect is different. This is more like filter -> map github.com/apache/spark/blob/v2.1.1/core/src/main/scala/org/…Parthenogenesis
@user3030878 how did you get Spark to write exactly 64 MB / 128 MB files? My Spark job gives tiny (1-2 MB each) files (no of files = default = 200). I cannot simply invoke repartition(n) to have approx 128 MB files each because n will vary greatly from one-job to another.Popularly
F
18

you have to repartiton your DataFrame to match the partitioning of the DataFrameWriter

Try this:

df
.repartition($"date")
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")
Flourish answered 9/6, 2017 at 14:5 Comment(4)
How can I read parquet files from multiple root directories and merge them?Chaw
@Raphael Doesn't partitionBy(date) reduces down the partitions to count of distinct dates? I think the DF has same no. of partitions as the distinct date count. Am I correct?Windgall
I also noted in the past that df .repartition(n,$"date") also gives the same result. Is that just by chance? Find in general the API interface and such not so logical.Robtrobust
@Robtrobust This behavior is explained in the pyspark docs, if both n and column parameters are specified in the repartition() method then only columns will be considered. "Changed in version 1.6: Added optional arguments to specify the partitioning columns. Also made numPartitions optional if partitioning columns are specified." Source: spark.apache.org/docs/3.1.3/api/python/reference/api/…Writeoff
T
6

In Python you can rewrite Raphael's Roth answer as:

(df
  .repartition("date")
  .write.mode("append")
  .partitionBy("date")
  .parquet("{path}".format(path=path)))

You might also consider adding more columns to .repartition to avoid problems with very large partitions:

(df
  .repartition("date", another_column, yet_another_colum)
  .write.mode("append")
  .partitionBy("date)
  .parquet("{path}".format(path=path)))
Twerp answered 17/10, 2018 at 14:16 Comment(0)
P
4

The simplest solution would be to replace your actual partitioning by :

df
 .repartition(to_date($"date"))
 .write.mode(SaveMode.Append)
 .partitionBy("date")
 .parquet(s"$path")

You can also use more precise partitioning for your DataFrame i.e the day and maybe the hour of an hour range. and then you can be less precise for writer. That actually depends on the amount of data.

You can reduce entropy by partitioning DataFrame and the write with partition by clause.

Parthenogenesis answered 9/6, 2017 at 14:20 Comment(0)
C
3

I came across the same issue and I could using coalesce solved my problem.

df
  .coalesce(3) // number of parts/files 
  .write.mode(SaveMode.Append)
  .parquet(s"$path")

For more information on using coalesce or repartition you can refer to the following spark: coalesce or repartition

Changeless answered 21/7, 2017 at 1:7 Comment(0)
C
2

Duplicating my answer from here: https://mcmap.net/q/271484/-spark-parquet-partitioning-large-number-of-files

This is working for me very well:

data.repartition(n, "key").write.partitionBy("key").parquet("/location")

It produces N files in each output partition (directory), and is (anecdotally) faster than using coalesce and (again, anecdotally, on my data set) faster than only repartitioning on the output.

If you're working with S3, I also recommend doing everything on local drives (Spark does a lot of file creation/rename/deletion during write outs) and once it's all settled use hadoop FileUtil (or just the aws cli) to copy everything over:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
  def copy(
          in : String,
          out : String,
          sparkSession: SparkSession
          ) = {
    FileUtil.copy(
      FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
      new Path(in),
      FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
      new Path(out),
      false,
      sparkSession.sparkContext.hadoopConfiguration
    )
  }
Coulee answered 4/12, 2018 at 19:41 Comment(1)
what is the difference between data.repartition(n, "key") vs data.repartition(n) ??Kunz
K
0

how about trying running scripts like this as map job consolidating all the parquet files into one:

$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
 -Dmapred.reduce.tasks=1 \
 -input "/hdfs/input/dir" \
 -output "/hdfs/output/dir" \
 -mapper cat \
 -reducer cat
Katonah answered 15/5, 2019 at 15:3 Comment(0)
I
0

For PySpark Users,

I recommend using RepartiPy to easily handle the small file problem.

import repartipy

128_mib_in_bytes = 134217728

# Use this if you have enough (executor) memory to cache the whole DataFrame
# If you have NOT enough memory (i.e. too large DataFrame), use 'repartipy.SamplingSizeEstimator' instead.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
    N = se.get_desired_partition_count(desired_partition_size_in_bytes=128_mib_in_bytes)
    se.reproduce().repartition(N).write.mode("append").partitionBy("date").parquet(f"{path}")
    # or
    se.reproduce().coalesce(N).write.mode("append").partitionBy("date").parquet(f"{path}")

Above code will calculate N by which each partition will have 128MiB after the repartition. Please see the docs or blog post for more details.

If you want similar approach for Scala or Java, you can reference the source code of RepartiPy to create your own utils.

Irreligion answered 2/7, 2024 at 15:25 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.