I've got a fairly simple job coverting log files to parquet. It's processing 1.1TB of data (chunked into 64MB - 128MB files - our block size is 128MB), which is approx 12 thousand files.
Job works as follows:
val events = spark.sparkContext
.textFile(s"$stream/$sourcetype")
.map(_.split(" \\|\\| ").toList)
.collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
.toDF()
df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")
It collects the events with a common schema, converts to a DataFrame, and then writes out as parquet.
The problem I'm having is that this can create a bit of an IO explosion on the HDFS cluster, as it's trying to create so many tiny files.
Ideally I want to create only a handful of parquet files within the partition 'date'.
What would be the best way to control this? Is it by using 'coalesce()'?
How will that effect the amount of files created in a given partition? Is it dependent on how many executors I have working in Spark? (currently set at 100).
map
on yourRDD
– FlourishSpark
to write exactly 64 MB / 128 MB files? MySpark
job gives tiny (1-2 MB each) files (no of files = default = 200). I cannot simply invokerepartition(n)
to have approx 128 MB files each becausen
will vary greatly from one-job to another. – Popularly