A similar question has been asked here, but it does not address my question properly. I am having nearly 100 DataFrames, with each having atleast 200,000
rows and I need to join them, by doing a full
join based on the column ID
, thereby creating a DataFrame with columns - ID, Col1, Col2,Col3,Col4, Col5..., Col102
.
Just for illustration, the structure of my DataFrames -
df1 = df2 = df3 = ..... df100 =
+----+------+------+------+ +----+------+ +----+------+ +----+------+
| ID| Col1| Col2| Col3| | ID| Col4| | ID| Col5| | ID|Col102|
+----+------+-------------+ +----+------+ +----+------+ +----+------+
| 501| 25.1| 34.9| 436.9| | 501| 22.33| | 503| 22.33| | 501| 78,1|
| 502| 12.2|3225.9| 46.2| | 502| 645.1| | 505| 645.1| | 502| 54.9|
| 504| 754.5| 131.0| 667.3| | 504| 547.2| | 504| 547.2| | 507| 0|
| 505|324.12| 48.93| -1.3| | 506| 2| | 506| 2| | 509| 71.57|
| 506| 27.51| 88.99| 67.7| | 507| 463.7| | 507| 463.7| | 510| 82.1|
.
.
+----+------+------|------| |----|------| |----|------| |----|------|
I starting joining these DataFrames by doing a full
join sequentially on all of them. Naturally, this is computationally intensive procedure and one must strive to reduce the number of shuffles
across different worker nodes. Therefore, I started by partitioning the DataFrame df1
based on ID
using repartition(), which hash-partitions
the DataFrame based on ID
into 30 partitions -
df1 = df1.repartition(30,'ID')
Now, I do a full
join between df1
and df2
.
df = df1.join(df2,['ID'],how='full')
df.persist()
Since df1
was already hash-partitioned
, so I had expected that this join
above would skip shuffles and would maintain the partitioner
of df1
, but I notice that a shuffle
did take place and it increased the number of partitions on df
to 200
. Now, if I keep on joining the subsequent DataFrames by calling them via a function like shown below, I get the error java.io.IOException: No space left on device
-
def rev(df,num):
df_temp = spark.read.load(filename+str(num)+'.csv')
df_temp.persist()
df = df.join(df_temp,['ID'],how='full')
df_temp.unpersist()
return df
df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() -
print("Total number of rows: "+str(df.count()))
df.unpersist() # Never reached this stage.
Update: Error message -
Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
Questions: 1. Why was the partitioner of df1
not maintained when we did the first join
?
2.How can I join these multiple tables efficiently and also avoid this No space left on device
issue? User @silvio here suggests to use .bucketBy(), but he also alluded to the fact the partitioner will be maintained, which did not happen. So, I am not sure as to what would be an efficient way to join these multiple DataFrames.
Any suggestion/hints will be very appreciated.
coalesce()
method on each dataframe joined, to mantain a smaller number of partition, coalesce_repartition – Designdf = df1; df = df.set_index('ID'); df2 = df2.set_index('ID'); df['col4'] = df2['col4']
, ... etc. Wish someone can add this into pyspark. – Giro