Why does Yarn on EMR not allocate all nodes to running Spark jobs?
Asked Answered
H

2

23

I'm running a job on Apache Spark on Amazon Elastic Map Reduce (EMR). Currently I'm running on emr-4.1.0 which includes Amazon Hadoop 2.6.0 and Spark 1.5.0.

When I start the job, YARN correctly has allocated all the worker nodes to the spark job (with one for the driver, of course).

I have the magic "maximizeResourceAllocation" property set to "true", and the spark property "spark.dynamicAllocation.enabled" also set to "true".

However, if I resize the emr cluster by adding nodes to the CORE pool of worker machines, YARN only adds some of the new nodes to the spark job.

For example, this morning I had a job that was using 26 nodes (m3.2xlarge, if that matters) - 1 for the driver, 25 executors. I wanted to speed up the job so I tried adding 8 more nodes. YARN has picked up all of the new nodes, but only allocated 1 of them to the Spark job. Spark did successfully pick up the new node and is using it as an executor, but my question is why is YARN letting the other 7 nodes just sit idle?

It's annoying for obvious reasons - I have to pay for the resources even though they're not being used, and my job hasn't sped up at all!

Anybody know how YARN decides when to add nodes to running spark jobs? What variables come into play? Memory? V-Cores? Anything?

Thanks in advance!

Horrorstruck answered 26/11, 2015 at 14:16 Comment(3)
Yes, welcome to the annoying world of YARN! Have you set yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator in the capacity-scheduler.xml?Anserine
I have not! I can give that a try (probably not until next week) but I am starting to suspect that Spark itself won't request more nodes than there are at the time it is started - but I could be wrong!Horrorstruck
Good luck :) Personally, I think that YARN - not Spark - is the problem. I have never had any problems with resources not being utilized when I ran Spark in Standalone mode (before EMR 4.x). However, since upgrading to EMR 4.x (and hence YARN) I have had a million problems - including underutilization of cores...Anserine
H
26

Okay, with the help of @sean_r_owen, I was able to track this down.

The problem was this: when setting spark.dynamicAllocation.enabled to true, spark.executor.instances shouldn't be set - an explicit value for that will override dynamic allocation and turn it off. It turns out that EMR sets it in the background if you do not set it yourself. To get the desired behaviour, you need to explicitly set spark.executor.instances to 0.

For the records, here is the contents of one of the files we pass to the --configurations flag when creating an EMR cluster:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },

    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },

    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.executor.instances": "0"
        }
    } 
]

This gives us an EMR cluster where Spark uses all the nodes, including added nodes, when running jobs. It also appears to use all/most of the memory and all (?) the cores.

(I'm not entirely sure that it's using all the actual cores; but it is definitely using more than 1 VCore, which it wasn't before, but following Glennie Helles's advice it is now behaving better and using half of the listed VCores, which seems to equal the actual number of cores...)

Horrorstruck answered 30/11, 2015 at 14:7 Comment(8)
I am using EMR 4.4. I tried this configuration with c4.xlarge machines with 2 worker nodes and 1 master and running with yarn client mode but my resources are underutilized. Also spark only runs 1 executor while there are 2 executors will all 4 cores used for 1 executor and other executor is sitting idleCollaborate
Keep in mind that the driver of your program will take one worker node. So if you only have 2 worker nodes, 1 will be allocated to the driver, and the other will be used as an executor. If you want two executors, you need 3 workers. You basically end up paying for an almost entirely idle master and, depending on your spark program, a largely idle worker as well.Horrorstruck
dynamicAllocation is now the default configuration for EMRs > 4.0.0 (docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/…)Payment
Am I the only one getting the error Number of executors must be a positive number ?Longlimbed
@Longlimbed I'm not working with this stack anymore at the moment; all I can attest to was the versions of things specified in the original questions. Everything worked with those versions of things at the time of the question + answer...Horrorstruck
Yeah that’s quite a common problem with this stack: what was valid 6 months ago may not be anymoreLonglimbed
I tried with : { "Classification": "spark-defaults", "Properties": { "spark.dynamicAllocation.enabled": "true", "spark.executor.instances": "0" } but it failed to start the spark session saying number of executor should be positive non zero number.Can let me know if it worked for you?Kellene
@Longlimbed Number of executors must be a positive number should be fixed in Spark 2.4 issues.apache.org/jira/browse/SPARK-24241Wife
B
3

I observed the same behavior with nearly the same settings using emr-5.20.0. I didn't try to add nodes when the cluster is already running but using TASK nodes (together with just one CORE node). I'm using InstanceFleets to define MASTER, CORE and TASK nodes (with InstanceFleets I don't know which exact InstanceTypes I get and that is why I don't want to define the number of executors, cores and memory per executor myself but want that to be maximized/optimized automatically).

With this, it only uses two TASK nodes (probably the first two nodes which are ready to use?) but never scales up while more TASK nodes get provisioned and finishing the bootstrap phase.

What made it work in my case was to set the spark.default.parallelism parameter (to the number of total number of cores of my TASK nodes), which is the same number used for the TargetOnDemandCapacity or TargetSpotCapacity of the TASK InstanceFleet:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },
    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.default.parallelism", <Sum_of_Cores_of_all_TASK_nodes>
        }
    } 
]

For the sake of completeness: I'm using one CORE node and several TASK nodes mainly to make sure the cluster has at least 3 nodes (1 MASTER, 1 CORE and at least one TASK node). Before I tried to used only CORE nodes, but as in my case the number of cores is calculated depending on the actual task it was possible to end up with a cluster consisting of just one MASTER and one CORE node. Using the maximizeResourceAllocation option such a cluster runs for ever doing nothing because the executor running the yarn application master is occupying that single CORE node completely.

Borreri answered 25/3, 2019 at 12:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.