Apache Spark: The number of cores vs. the number of executors
Asked Answered
F

9

253

I'm trying to understand the relationship of the number of cores and the number of executors when running a Spark job on YARN.

The test environment is as follows:

  • Number of data nodes: 3
  • Data node machine spec:
    • CPU: Core i7-4790 (# of cores: 4, # of threads: 8)
    • RAM: 32GB (8GB x 4)
    • HDD: 8TB (2TB x 4)
  • Network: 1Gb

  • Spark version: 1.0.0

  • Hadoop version: 2.4.0 (Hortonworks HDP 2.1)

  • Spark job flow: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile

  • Input data

    • Type: single text file
    • Size: 165GB
    • Number of lines: 454,568,833
  • Output

    • Number of lines after second filter: 310,640,717
    • Number of lines of the result file: 99,848,268
    • Size of the result file: 41GB

The job was run with following configurations:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (executors per data node, use as much as cores)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (# of cores reduced)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (less core, more executor)

Elapsed times:

  1. 50 min 15 sec

  2. 55 min 48 sec

  3. 31 min 23 sec

To my surprise, (3) was much faster.
I thought that (1) would be faster, since there would be less inter-executor communication when shuffling.
Although # of cores of (1) is fewer than (3), #of cores is not the key factor since 2) did perform well.

(Followings were added after pwilmot's answer.)

For the information, the performance monitor screen capture is as follows:

  • Ganglia data node summary for (1) - job started at 04:37.

Ganglia data node summary for (1)

  • Ganglia data node summary for (3) - job started at 19:47. Please ignore the graph before that time.

Ganglia data node summary for (3)

The graph roughly divides into 2 sections:

  • First: from start to reduceByKey: CPU intensive, no network activity
  • Second: after reduceByKey: CPU lowers, network I/O is done.

As the graph shows, (1) can use as much CPU power as it was given. So, it might not be the problem of the number of the threads.

How to explain this result?

Fictionalize answered 8/7, 2014 at 0:46 Comment(8)
Now I'm suspecting GC... In fact, on Spark UI the total time spent for GC is longer on 1) than 2).Fictionalize
Why didn't you try 3) with 19G? Could it be that confining the workers on 4G reduce the NUMA effect that some ppl have spot? i.e your 4G are located on one of the 2 cores allocated to your workflow and thus there is less i/o slowdown, leading to better overall performances. Otherwise I think a main question is: how many cores/thread can use one single executor on a worker? (One can only specify the total number of cores for a worker, not at the granularity of the executor)Sawmill
Btw I just checked the code at core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala and it seems that 1 executor = 1 worker's thread.Sawmill
a bit late but here is a post on cloudera on this topic: blog.cloudera.com/blog/2015/03/…Mann
By the way, I found this information in a cloudera slide deck slideshare.net/cloudera/… , that explains a bit about the decission making in executors ,cores and memoryMccallion
How many cores are available in the yarn resource manager. Can you please add that snippet hereVietnam
Is this a correct statement? “ Although # of cores of (1) is fewer than (3), #of cores is not the key factor since 2) did perform well.”. From the 3 configurations Given, inverse Of this statement is true.Jedjedd
Did you also have enough partitions and spark.default.parallelism to take advantage of the number of threads (say 50+)? Even better, were these parameters held constant for all the runs? Then one could definitively point to a throughput bottleneck.Madly
D
85

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:

63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers. The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput.

A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?

This config results in three executors on all nodes except for the one with the AM, which will have two executors. --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.

The explanation was given in an article in Cloudera's blog, How-to: Tune Your Apache Spark Jobs (Part 2).

Diluvium answered 8/6, 2016 at 17:59 Comment(11)
"This config results in three executors on all nodes except for the one with the AM, which will have two executors. ". What does this mean regarding with "--executor-cores 5"?Aeolian
It means each executor uses 5 cores. Each node has 3 executors therefore using 15 cores, except one of the nodes will also be running the application master for the job, so can only host 2 executors i.e. 10 cores in use as executors.Raychel
Nicely explained - please note that this applies to yarn.scheduler.capacity.resource-calculator disabled, which is the default. This is because by default it schedules by Memory and not by CPU.Endosperm
More executors can lead to bad HDFS I/O throughput. So if I'm not using HDFS at all, in that case can I use more then 5 cores per executor?Heading
I though the Application master runs on each Node. Per above, which means there would be only 1 Application Master to run the job. Is that correct?Ossian
@Raychel I still didn't understand how OP arrived at 5 executor cores. Can it also be 3 executor cores, 28 executors? or any other combination as long as we use up the number of cores? Or is there a formula to first decide the number of executor cores?Ensor
@Ensor tbh it is crazy that people need to think about these things, the platform you are using should manage it. Newer Spark versions have more flexible settings for max allocation rather than specifics. Yes you're right it could be some other config, as long as you balance cpu, memory and IO per executor given the host node's specs. There's no hard and fast formula, and is highly dependent on your specific job. There's a spreadsheet in this blog that I did find very helpful to understand the possible settings: c2fo.io/c2fo/spark/aws/emr/2016/07/06/…Raychel
@deppfx, the author states that he's making a "rough guess", from personal experience, that 5 cores per executor is the highest setting that doesn't create HDFS throughput issues. In any case, this always depends on the workload, and that's one of the reasons why the platform can't optimize automatically; in fact, I tried the settings suggested in the post on a specific workload and got worse results than I had before.Osy
@Raychel each job has its own memory requirements. If you put too many executors with too little RAM each, they will crash from an overflow. If you put too few executors, then you will be I/O bound, too many CPU bound. It is a matter of balance for your job.Ethben
@Ethben sure, and having to tweak those settings and having those java heap space out of memory errors or jobs take too long is just painful and a waste of time. RDBMS query execution planners have been addressing these issues for years. I want these things managed for me. Workload estimation is possible if you have appropriate metadata: file count, file size, partitions, statistics etc. If you are running a 2 hour spark job then taking a few minutes up front to plan and auto-size a cluster would be a cost worth paying.Raychel
@Raychel EMR and similar platforms manage this for you, but particularly demanding jobs crash Spark and data warehouses alike.Ethben
B
29

Short answer: I think tgbaggio is right. You hit HDFS throughput limits on your executors.

I think the answer here may be a little simpler than some of the recommendations here.

The clue for me is in the cluster network graph. For run 1 the utilization is steady at ~50 M bytes/s. For run 3 the steady utilization is doubled, around 100 M bytes/s.

From the cloudera blog post shared by DzOrd, you can see this important quote:

I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.

So, let's do a few calculations see what performance we expect if that is true.


Run 1: 19 GB, 7 cores, 3 executors

  • 3 executors x 7 threads = 21 threads
  • with 7 cores per executor, we expect limited IO to HDFS (maxes out at ~5 cores)
  • effective throughput ~= 3 executors x 5 threads = 15 threads

Run 3: 4 GB, 2 cores, 12 executors

  • 2 executors x 12 threads = 24 threads
  • 2 cores per executor, so hdfs throughput is ok
  • effective throughput ~= 12 executors x 2 threads = 24 threads

If the job is 100% limited by concurrency (the number of threads). We would expect runtime to be perfectly inversely correlated with the number of threads.

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

So ratio_num_threads ~= inv_ratio_runtime, and it looks like we are network limited.

This same effect explains the difference between Run 1 and Run 2.


Run 2: 19 GB, 4 cores, 3 executors

  • 3 executors x 4 threads = 12 threads
  • with 4 cores per executor, ok IO to HDFS
  • effective throughput ~= 3 executors x 4 threads = 12 threads

Comparing the number of effective threads and the runtime:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

It's not as perfect as the last comparison, but we still see a similar drop in performance when we lose threads.

Now for the last bit: why is it the case that we get better performance with more threads, esp. more threads than the number of CPUs?

A good explanation of the difference between parallelism (what we get by dividing up data onto multiple CPUs) and concurrency (what we get when we use multiple threads to do work on a single CPU) is provided in this great post by Rob Pike: Concurrency is not parallelism.

The short explanation is that if a Spark job is interacting with a file system or network the CPU spends a lot of time waiting on communication with those interfaces and not spending a lot of time actually "doing work". By giving those CPUs more than 1 task to work on at a time, they are spending less time waiting and more time working, and you see better performance.

Broccoli answered 7/1, 2019 at 22:41 Comment(2)
Interesting and convincing explanation, I wonder if how you came up your guess that the executor has 5 tasks limit to achieve maximum throughput.Germin
So the number 5 isn't something I came up with: I just noticed signs of IO bottlenecking and went off in search of where those bottlenecks may be coming from.Broccoli
A
21

As you run your spark app on top of HDFS, according to Sandy Ryza

I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.

So I believe that your first configuration is slower than third one is because of bad HDFS I/O throughput

Angelicaangelico answered 6/10, 2015 at 7:32 Comment(0)
W
15

From the excellent resources available at RStudio's Sparklyr package page:

SPARK DEFINITIONS:

It may be useful to provide some simple definitions for the Spark nomenclature:

Node: A server

Worker Node: A server that is part of the cluster and are available to run Spark jobs

Master Node: The server that coordinates the Worker nodes.

Executor: A sort of virtual machine inside a node. One Node can have multiple Executors.

Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located.

Driver (Executor): The Driver Node will also show up in the Executor list.

Workshop answered 22/4, 2018 at 21:0 Comment(0)
Y
12

I haven't played with these settings myself so this is just speculation but if we think about this issue as normal cores and threads in a distributed system then in your cluster you can use up to 12 cores (4 * 3 machines) and 24 threads (8 * 3 machines). In your first two examples you are giving your job a fair number of cores (potential computation space) but the number of threads (jobs) to run on those cores is so limited that you aren't able to use much of the processing power allocated and thus the job is slower even though there is more computation resources allocated.

you mention that your concern was in the shuffle step - while it is nice to limit the overhead in the shuffle step it is generally much more important to utilize the parallelization of the cluster. Think about the extreme case - a single threaded program with zero shuffle.

Yasmin answered 8/7, 2014 at 1:2 Comment(3)
Thank for your answer. But I suspect that the number of threads is not the main problem. I've added the monitoring screen capture. As the graph shows, 1) can use as much CPU power as it was given.Fictionalize
@Fictionalize pwilmot is correct - you need 2-4 tasks MINIMUM in order to utilize the full potential of your cores. Put it this was - I usually use at least 1000 partitions for my 80 core cluster.Wayless
@Wayless What I want to know is the reason of the performance difference between 1) and 3). When I watch the Spark UI, both runs 21 tasks in parallel in section 2. (why 21 instead of 24 in case of 3) is unknown for now) But, the tasks for 3) just runs faster.Fictionalize
A
1

I think one of the major reasons is locality. Your input file size is 165G, the file's related blocks certainly distributed over multiple DataNodes, more executors can avoid network copy.

Try to set executor num equal blocks count, i think can be faster.

Anastasia answered 6/9, 2015 at 12:9 Comment(0)
P
1

Spark Dynamic allocation gives flexibility and allocates resources dynamically. In this number of min and max executors can be given. Also the number of executors that has to be launched at the starting of the application can also be given.

Read below on the same:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

Paradigm answered 13/11, 2016 at 6:1 Comment(0)
V
1

There is a small issue in the First two configurations i think. The concepts of threads and cores like follows. The concept of threading is if the cores are ideal then use that core to process the data. So the memory is not fully utilized in first two cases. If you want to bench mark this example choose the machines which has more than 10 cores on each machine. Then do the bench mark.

But dont give more than 5 cores per executor there will be bottle neck on i/o performance.

So the best machines to do this bench marking might be data nodes which have 10 cores.

Data node machine spec: CPU: Core i7-4790 (# of cores: 10, # of threads: 20) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4)

Vietnam answered 30/10, 2017 at 2:26 Comment(0)
S
1

In the 2.) configuration you're reducing the parallel tasks and thus I believe your comparison isn't fair. Make the --num-executors to atleast 5. Thus, you will have 20 tasks running in comparison to your 21 tasks in 1.) configuration. Then, the comparison will be fair as per me.

Also, please calculate the executor memory accordingly.

Schmuck answered 3/8, 2022 at 15:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.