Efficient way of joining multiple tables in Spark - No space left on device
Asked Answered
S

2

6

A similar question has been asked here, but it does not address my question properly. I am having nearly 100 DataFrames, with each having atleast 200,000 rows and I need to join them, by doing a full join based on the column ID, thereby creating a DataFrame with columns - ID, Col1, Col2,Col3,Col4, Col5..., Col102.

Just for illustration, the structure of my DataFrames -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+    +----+------+    +----+------+         +----+------+ 
|  ID|  Col1|  Col2|  Col3|    |  ID|  Col4|    |  ID|  Col5|         |  ID|Col102|
+----+------+-------------+    +----+------+    +----+------+         +----+------+
| 501|  25.1|  34.9| 436.9|    | 501| 22.33|    | 503| 22.33|         | 501|  78,1|
| 502|  12.2|3225.9|  46.2|    | 502| 645.1|    | 505| 645.1|         | 502|  54.9|
| 504| 754.5| 131.0| 667.3|    | 504| 547.2|    | 504| 547.2|         | 507|     0|
| 505|324.12| 48.93|  -1.3|    | 506|     2|    | 506|     2|         | 509| 71.57|
| 506| 27.51| 88.99|  67.7|    | 507| 463.7|    | 507| 463.7|         | 510|  82.1|
.
.
+----+------+------|------|    |----|------|    |----|------|         |----|------|

I starting joining these DataFrames by doing a full join sequentially on all of them. Naturally, this is computationally intensive procedure and one must strive to reduce the number of shuffles across different worker nodes. Therefore, I started by partitioning the DataFrame df1 based on ID using repartition(), which hash-partitions the DataFrame based on ID into 30 partitions -

df1 = df1.repartition(30,'ID')

Now, I do a full join between df1 and df2.

df = df1.join(df2,['ID'],how='full')
df.persist()

Since df1 was already hash-partitioned, so I had expected that this join above would skip shuffles and would maintain the partitioner of df1, but I notice that a shuffle did take place and it increased the number of partitions on df to 200. Now, if I keep on joining the subsequent DataFrames by calling them via a function like shown below, I get the error java.io.IOException: No space left on device -

def rev(df,num):
     df_temp = spark.read.load(filename+str(num)+'.csv')
     df_temp.persist()
     df = df.join(df_temp,['ID'],how='full')
     df_temp.unpersist()
     return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() - 
print("Total number of rows: "+str(df.count()))
df.unpersist()  # Never reached this stage.

Update: Error message -

Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

Questions: 1. Why was the partitioner of df1 not maintained when we did the first join?

2.How can I join these multiple tables efficiently and also avoid this No space left on device issue? User @silvio here suggests to use .bucketBy(), but he also alluded to the fact the partitioner will be maintained, which did not happen. So, I am not sure as to what would be an efficient way to join these multiple DataFrames.

Any suggestion/hints will be very appreciated.

Scofield answered 14/3, 2019 at 13:51 Comment(3)
Try to use coalesce() method on each dataframe joined, to mantain a smaller number of partition, coalesce_repartitionDesign
Not the answer to the original question. But with only 200,000 rows, you can do this in pandas in a second. df = df1; df = df.set_index('ID'); df2 = df2.set_index('ID'); df['col4'] = df2['col4'], ... etc. Wish someone can add this into pyspark.Giro
Well, it was just an example..... the idea is to understand how Spark does its distribution on the cluster and how load balancing can be accomplished efficiently.Scofield
O
3

1st try to persist your big df every N iterations with a for loop (that you probably have already)

2nd try to control the default partition number by setting sqlContext.sql("set spark.sql.shuffle.partitions=100") instead of 200 that is the default.

Your code should look like:

num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
   big_df = big_df.join(df, ....)

   if i % num_partitions == 0:
     big_df = big_df.persist()

Here I call persist every 10 iterations you can of course adjust that number according to the behavior of your job.

EDIT: In your case you are persisting the local df_temp inside the rev function but not the whole dataframe which contains all the previous joins (df in your case). This will have no effect in the final execution plan since it is a local persist. As for my suggestion let's assume that you need in total 100 joins then with the code above you should iterate through a loop [1..100] and persist the accumulated results every 10 iterations. After persisting the big dataframe the DAG will contain less in-memory calculations since the intermediate steps will be stored and Spark knows how to restore them from the storage instead of recalculating everything from scratch.

Oxyhydrogen answered 15/3, 2019 at 0:4 Comment(13)
Many thanks for your answer. I just updated my Question so as to reflect exactly how I am approaching this problem. I am loading df2 to df100 via a function and then performing a join on the main dataframe inside the function itself and returning it. I am persisting the main DataFrame throughout, but I get an error before unpersist() on the main dataframe gets called. I am a bit confused regarding doing persist every 10 DataFrames. Can you kindly elaborate on that, taking into the update I made in the question into account? Thanks a lot.Scofield
Hello @cph_sto, 1st note is that you have an error about (TID 8755, localhost, executor driver): java.io.IOException: No space left on device telling you that there is no space left on your driver node! 2nd I have updated my answer with more details how this approach should work and what the benefits will be.Oxyhydrogen
Hi Alexandros, Many thanks for your reply. I was persisting the main dataframe as well, as mentioned in mypost df.persist(). Let me try the 10 interval persist you just mentioned.Scofield
Hallo, I tried the 10 persist technique you mentioned, but got the same error message. I decreased the interval size to 5 as well, still same result. I took the main dataframe df and once it had added 10 tables, I persisted it - df.persist(), and then I started joining 11th till 20th table and again persisted df.persist() ... similarly did till table 100. Still got the error.Scofield
hi @Scofield did you fix the storage error already? Without fixing this one you souldn't try anything else because it will not work without available storageOxyhydrogen
Hi, Sorry, I did not. I presume that it required administrative privileges to do so and I don't have that. I am on the Jupyter+Spark, and I read here that I need to set the SPARK_LOCAL_DIRS in the SPARK_HOME/conf/spark_defaults.conf. Let me try to get some more space. I will try youtr technique after that and inform you. Many thanks Alexandros for your kind help :)Scofield
@Alexandros : Referring to #55657259 Is there a way to AVOID Join by appending sorted columns together. All my tables df_x will have the same number of rows, ID column. Instead of joining and hurting the perf, can we just sort, then append them keeping the sorted order ?Newspaperman
Hello @Scofield did that work finally for your problem?Oxyhydrogen
@AlexandrosBiratsis Hallo, Nice to hear from you. Well, I have requested the administrator to increase the space on ‘temp’, and it’s taking some time. I will definitely keep you informed as soon as I implement your suggestions. Once again, thanks for asking for a feedback. Very appreciated. :)Scofield
Sure @Scofield let me know and greetingsOxyhydrogen
@Alexandros Biratsis. A vote up for you. I have also followed the similar strategy and was able to join my dataframe serially in loop. Using sort merge join.. see my answer below. https://mcmap.net/q/538902/-efficient-pyspark-join/…. It's working fine but i was not able to code iterative broadcast hash join and question doesn't have proper answer yet.. see link below https://mcmap.net/q/541782/-broadcast-hash-join-iterative any help would be appreciated. ThanksImpel
Hello vikrantrana I will check your question although at the same time, I would be very curious to check the suggestion of @ollik1 below. He is suggesting instead of join to use zipPartition on RDDs, which to me sounds very interesting although I could not try it so far!Oxyhydrogen
@Alexandros Biratsis- Thanks.Impel
P
1

I've had similar problem in past except didn't have that many RDDs. The most efficient solution I could find was to use the low level RDD API. First store all the RDDs so that they are (hash) partitioned and sorted within partitions by the join column(s): https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner-

After this the join can be implemented using zip partitions without shuffling or using much memory: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions-org.apache.spark.rdd.RDD-boolean-scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-

Pallaten answered 14/3, 2019 at 14:17 Comment(2)
Never seen such a use case in fact.Bitchy
Thanks for your comments. I have actually partitioned all my dataframes by ID using repartition() and then joined them. But, the moment I join them, the number of partitions increased to default 200. The first part you are mentioning, I am doing that, though I am not doing sort within partition. By the way, are you suggesting co-location as well to avoid shuffle? If that's the case, then in Spark <= 2.4 we can't define a co-partitioner, so would be difficult to achieve so.Scofield

© 2022 - 2024 — McMap. All rights reserved.