How can I wait for completion of an Elastic MapReduce job flow in a Java application?
Asked Answered
E

3

12

Recently I've been working with Amazon Web Services (AWS) and I've noticed there is not much documentation on the subject, so I added my solution.

I was writing an application using Amazon Elastic MapReduce (Amazon EMR). After the calculations ended I needed to perform some work on the files created by them, so I needed to know when the job flow completed its work.

This is how you can check if your job flow completed:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest()
    .withJobFlowStates("COMPLETED");

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows();
JobFlowDetail detail = jobs.get(0);

detail.getJobFlowId(); //the id of one of the completed jobs

You can also look for a specific job id in DescribeJobFlowsRequest and then to check if that job has finished of failed.

I hope it will help others.

Electrostriction answered 25/5, 2012 at 16:47 Comment(3)
Submitting your own solution to your problem immediately is quite welcome here, however, the desired approach is to split this into a question and an answer still, see It’s OK to Ask and Answer Your Own Questions - this helps to sort/categorize things appropriately, i.e. make room for really unanswered questions where applicable, thanks!Intercostal
Thanks, I'll note it as a future reference.Electrostriction
You should include the other completed states as well. Some folks reading this might loop forever if they initialize jobAttributes as given. DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest().withJobFlowStates( "COMPLETED", "TERMINATED", "FAILED" );Tody
O
3

I also ran into this problem, and here's the solution I came up with for now. It's not perfect, but hopefully it'll be helpful. For reference, I'm using Java 1.7 and AWS Java SDK version 1.9.13.

Note that this code assumes that you're waiting for the cluster to terminate, not the steps strictly speaking; if your cluster terminates when all your steps are done this is alright, but if you're using clusters that stay alive after step completion this won't help you too much.

Also, note that this code monitors and logs cluster state changes, and in addition diagnoses whether the cluster terminated with errors and throws an exception if it did.

private void yourMainMethod() {
    RunJobFlowRequest request = ...;

    try {
        RunJobFlowResult submission = emr.runJobFlow(request);
        String jobFlowId = submission.getJobFlowId();
        log.info("Submitted EMR job as job flow id {}", jobFlowId);

        DescribeClusterResult result = 
            waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS);
        diagnoseClusterResult(result, jobFlowId);
    } finally {
        emr.shutdown();
    }
}

private DescribeClusterResult waitForCompletion(
             AmazonElasticMapReduceClient emr, String jobFlowId,
             long sleepTime, TimeUnit timeUnit)
        throws InterruptedException {
    String state = "STARTING";
    while (true) {
        DescribeClusterResult result = emr.describeCluster(
                new DescribeClusterRequest().withClusterId(jobFlowId)
        );
        ClusterStatus status = result.getCluster().getStatus();
        String newState = status.getState();
        if (!state.equals(newState)) {
            log.info("Cluster id {} switched from {} to {}.  Reason: {}.",
                     jobFlowId, state, newState, status.getStateChangeReason());
            state = newState;
        }

        switch (state) {
            case "TERMINATED":
            case "TERMINATED_WITH_ERRORS":
            case "WAITING":
                return result;
        }

        timeUnit.sleep(sleepTime);
    }
}

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) {
    ClusterStatus status = result.getCluster().getStatus();
    ClusterStateChangeReason reason = status.getStateChangeReason();
    ClusterStateChangeReasonCode code = 
        ClusterStateChangeReasonCode.fromValue(reason.getCode());
    switch (code) {
    case ALL_STEPS_COMPLETED:
        log.info("Completed EMR job {}", jobFlowId);
        break;
    default:
        failEMR(jobFlowId, status);
    }
}

private static void failEMR(String jobFlowId, ClusterStatus status) {
    String msg = "EMR cluster run %s terminated with errors.  ClusterStatus = %s";
    throw new RuntimeException(String.format(msg, jobFlowId, status));
}
Ovoid answered 20/12, 2014 at 0:46 Comment(0)
I
1

Once the job flow completed, the cluster stops and the HDFS partition is lost. in order to prevent loss of data, configure the last step of the job flow to store results in Amazon S3.

If the JobFlowInstancesDetail : KeepJobFlowAliveWhenNoSteps parameter is set to TRUE, the job flow will transition to the WAITING state rather than shutting down once the steps have completed.

A maximum of 256 steps are allowed in each job flow.

If your job is time-consuming I recommend you to store the results periodically.

Long story short: there is no way of knowing when it is done. Instead you need to save your data as part of the job.

Immediately answered 8/7, 2012 at 7:37 Comment(0)
E
1

Use --wait-for-steps option when creating job flow.

./elastic-mapreduce --create \
...
 --wait-for-steps \
...
Exhilarate answered 24/3, 2013 at 7:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.