Spark pulling data into RDD or dataframe or dataset
Asked Answered
A

1

8

I'm trying to put into simple terms when spark pulls data through the driver, and then when spark doesn't need to pull data through the driver.

I have 3 questions -

  1. Let's day you have a 20 TB flat file file stored in HDFS and from a driver program you pull it into a data frame or an RDD, using one of the respective libraries' out of the box functions (sc.textfile(path) or sc.textfile(path).toDF, etc). Will it cause the driver program to have OOM if the driver is run with only 32 gb memory? Or at least have swaps on the driver Jim? Or will spark and hadoop be smart enough to distribute the data from HDFS into a spark executor to make a dataframe/RDD without going through the driver?
  2. The exact same question as 1 except from an external RDBMS?
  3. The exact same question as 1 except from a specific nodes file system (just Unix file system, a 20 TB file but not HDFS)?
Abdella answered 20/8, 2016 at 3:38 Comment(1)
I have seen first two..large data, bigger than driver memory not giving any error. So Yes for first two. If say third dont work you can always push it to HDFS and then it will be first question. Indeed you have 2 questions though numbers go from 1 to 3.Reunionist
P
5

Regarding 1

Spark operates with distributed data structure like RDD and Dataset (and Dataframe before 2.0). Here are the facts that you should know about this data structures to get the answer to your question:

  1. All the transformation operations like (map, filter, etc.) are lazy. This means that no reading will be performed unless you require a concrete result of your operations (like reduce, fold or save the result to some file).
  2. When processing a file on HDFS Spark operates with file partitions. Partition is a minimal logical batch of data the can be processed. Normally one partition equals to one HDFS block and the total number of partitions can never be less then number of blocks in a file. The common (and default one) HDFS block size is 128Mb
  3. All actual computations (including reading from the HDFS) in RDD and Dataset are performed inside of executors and never on driver. Driver creates a DAG and logical plan of execution and assigns tasks to executors for further processing.
  4. Each executor runs the previously assigned task against a particular partition of data. So normally if you allocate only one core to your executor it would process no more than 128Mb (default HDFS block size) of data at the same time.

So basically when you invoke sc.textFile no actual reading happens. All mentioned facts explain why OOM doesn't occur while processing even 20 Tb of data.

There are some special cases like i.e. join operations. But even in this case all executors flush their intermediate results to local disk for further processing.

Regarding 2

In case of JDBC you can decide how many partitions will you have for your table. And choose the appropriate partition key in your table that will split the data into partitions properly. It's up to you how many data will be loaded into a memory at the same time.

Regarding 3

The block size of the local file is controlled by the fs.local.block.size property (I guess 32Mb by default). So it is basically the same as 1 (HDFS file) except the fact that you will read all data from one machine and one physical disk drive (which is extremely inefficient in case of 20TB file).

Preemption answered 22/8, 2016 at 21:56 Comment(4)
I did a test on a cluster I have and I definitely pulled a much bigger data file from HDFS and bigger table from a postgre instance into my spark cluster without any problem. Not TB but over 40 GB. I pulled a local file into Spark as well and it also worked but I couldn't have it just on one machine. I needed to put hte 40 GB file on all machines that had executors.Abdella
The summary of my explanation is that Spark will never load the whole dataset into a memory. Only small chunks of it. To distribute your 40Gb file in cluster you can use HDFS. In this case Spark will assign tasks to executors with respect to data locality. This means that executor will process only those blocks that are located on the same machine where the executer is running (in the best case).Preemption
I already tested part 1 and part 2 and yes it pulled it from the rdbms fine and hdfs and the file and the table were much bigger than all the memory the executors put together. The driver would only come into play if the whole data set was returned to a java array inside the application. Thanks for your answer. Of course I can put the data into hdfs to distribute it. I had to make 3 local copies of the file to read it into spark from local drive, which was not a problem i anticipated when i began the test.Abdella
Regarding the driver - yes, you're absolutely right. In case of collecting all the data to a driver it can really run out of memory. This also true for any operations with unbound results like take and countByKey. I'm glad my answer was helpful.Preemption

© 2022 - 2024 — McMap. All rights reserved.