Spring Batch resume after server's failure
Asked Answered
D

4

20

I am using spring batch to parse files and I have the following scenario:

I am running a job. This job has to parse a giving file. For unexpected reason (let say for power cut) the server fails and I have to restart the machine. Now, after restarting the server I want to resume the job from the point which stopped before the power cut. This means that if the system read 1.300 rows from 10.000 now have to start reading from 1.301 row.

How can I achieve this scenario using spring batch?

About configuration: I use spring-integration which polls under a directory for new files. When a file is arrived the spring-integration creates the spring batch job. Also, spring-batch uses FlatFileItemReader to parse the file.

Dropsical answered 5/3, 2013 at 13:24 Comment(2)
It has been a long time since this question was asked. Is there an officially supported solution now?Coacher
Any way to solve this problem? I tried the below approaches but it creates a new job instance with null job parameters and thus unable to process the input file. I am using a file poller which should start the job if file in available. once failed, on restarting the application the job should start from the same input line where it left. Its not happening.Incise
P
4

Here is the complete solution to restart a job after JVM crash.

  1. Make a job restartable by making restarable="true"

job id="jobName" xmlns="http://www.springframework.org/schema/batch" restartable="true"

2 . Code to restart a job

import java.util.Date;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;

public class ResartJob {

    @Autowired
    private JobExplorer jobExplorer;
    @Autowired
    JobRepository jobRepository;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired 
    JobOperator jobOperator;

    public void restart(){
        try {
            List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
            if(CollectionUtils.isNotEmpty(jobInstances)){
               JobInstance jobInstance =  jobInstances.get(0);
               List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
               if(CollectionUtils.isNotEmpty(jobExecutions)){
                   for(JobExecution execution: jobExecutions){
                       // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                       if(execution.getStatus().equals(BatchStatus.STARTED)){ 
                           execution.setEndTime(new Date());
                           execution.setStatus(BatchStatus.FAILED);                               
                           execution.setExitStatus(ExitStatus.FAILED);                               
                           jobRepository.update(execution);
                           jobOperator.restart(execution.getId());
                       }
                   }
               }
            }
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

3.

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>

<bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>


<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor" ref="jobLauncherTaskExecutor" /> 
</bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>
Poynter answered 25/3, 2016 at 17:50 Comment(11)
Above logic wouldn't restart the job if there are any job steps in STARTED status. Those step executions also need to be marked as FAILED in addition to job execution status. Also, restart becomes meaningless for long running single step jobs since logic starts steps from start, ignoring already processed chunks. How to achieve step level restart / resume taking into account already processed chunks for that step? I mean, pure resume.Handgun
1. I would say, code shouldn't change STARTED status to FAILED otherwise whenever scheduler/polling will start a new execution it may change the status of correctly running step with STARTED status to FAILED. Handle it separately, if you think that a particular step is taking more time than usual. #2 If you use spring's reader e.g. StaxEventItemReader, HibernateCursorItemReader you need not to bother about already processed chunks, spring handles that. And if you have your own custom reader then you can take help of BATCH_STEP_EXECUTION table to get the information about processed records..Poynter
My reader is - JdbcPagingItemReader using custom processor & writer. I later asked this question where I show code that I have written. Your concern is valid that with this logic, a properly running job might be marked as failed.Handgun
@SabirKhan Try to modify following code to suite your new question's requirement - stepExecution.getExecutionContext().putInt("StaxEventItemReader.read.count", stepExecution.getWriteCount());Poynter
To prevent new jobs being set to failed, you could also check the creation date is earlier than the JVM start up date. See e.g. here for how to get JVM start time.Coacher
Hi, does anyone have any idea where to put up the RestartJob code or call the restart function?Incise
@Incise You can call RestartJob in polling or in scheduler.Poynter
Hi @SumitSundriyal, thanks. My app uses a filewatcher to look for a file and launch job using JobLaunchRequest i.e. JobLaunchingMessageHandler via adding the input file as job parameters. This launches my Job. Now please help clarify where to call this RestartJob.Incise
@Incise I had the similar process. I was handling this with the a spring job which runs every minute. That job was responsible for starting a new job or resume previously stopped job. If you don't have custom job then you have to look into the api doc, extend and override provided processing methods (which are being called by spring batch internally).Poynter
Hi Sumit, thanks for guiding me on this. How I solved it is by plugging the above peice of code but without the jobOperator.run() to just edit the jobreposotory in the DB in the CustomJobLaunchingMessageHandler which uses jobLauncher.run() to start the job. This way restart is handled automatically. jobOperator.run() within also uses jobLauncher.run() so removed that bit from the code.Working like a charm. Thanks a lot it saved my nights and I learned the basics here very well.Incise
@Mahmoud Ben Hassine - Could you please us here ?Amias
C
3

An updated work-around for Spring batch 4. Takes JVM start up time into account for broken jobs detection. Please note that this will not work when in a clustered environment where multiple servers start jobs.

@Bean
public ApplicationListener<ContextRefreshedEvent> resumeJobsListener(JobOperator jobOperator, JobRepository jobRepository,
        JobExplorer jobExplorer) {
    // restart jobs that failed due to
    return event -> {
        Date jvmStartTime = new Date(ManagementFactory.getRuntimeMXBean().getStartTime());

        // for each job
        for (String jobName : jobExplorer.getJobNames()) {
            // get latest job instance
            for (JobInstance instance : jobExplorer.getJobInstances(jobName, 0, 1)) {
                // for each of the executions
                for (JobExecution execution : jobExplorer.getJobExecutions(instance)) {
                    if (execution.getStatus().equals(BatchStatus.STARTED) && execution.getCreateTime().before(jvmStartTime)) {
                        // this job is broken and must be restarted
                        execution.setEndTime(new Date());
                        execution.setStatus(BatchStatus.FAILED);
                        execution.setExitStatus(ExitStatus.FAILED);

                        for (StepExecution se : execution.getStepExecutions()) {
                            if (se.getStatus().equals(BatchStatus.STARTED)) {
                                se.setEndTime(new Date());
                                se.setStatus(BatchStatus.FAILED);
                                se.setExitStatus(ExitStatus.FAILED);
                                jobRepository.update(se);
                            }
                        }

                        jobRepository.update(execution);
                        try {
                            jobOperator.restart(execution.getId());
                        }
                        catch (JobExecutionException e) {
                            LOG.warn("Couldn't resume job execution {}", execution, e);
                        }
                    }
                }
            }
        }
    };
}
Coacher answered 1/6, 2018 at 9:28 Comment(3)
Do you have any ideas about how to deal with this situation in a clustered environment? I ran into this issue right now #51569154Ginder
@Ginder Not really.Coacher
Usually, batch logic would be placed into it's own application and not combined with the clustered web app. However, this is not always possible. To ensure that this logic is only run on one cluster, an easy solution would be to place a distributed lock around this logic so only one node in the cluster can call this logic at once. Check out Hazelcast's distributed lock feature.Buttery
S
0

What I would do in your situation is to create a step to log the last processed row in a file. Then create a second job that would read this file and start the processing from a specific row number.

So if the job stops due to whatever reason you will be able to run the new Job that will resume the processing.

Swordplay answered 5/3, 2013 at 19:30 Comment(0)
P
0

you can also write like below :

    @RequestMapping(value = "/updateStatusAndRestart/{jobId}/{stepId}", method = GET)
    public ResponseEntity<String> updateBatchStatus(@PathVariable("jobId") Long jobExecutionId ,@PathVariable("stepId")Long stepExecutionId )throws Exception {

       StepExecution stepExecution =  jobExplorer.getStepExecution(jobExecutionId,stepExecutionId);
            stepExecution.setEndTime(new Date(System.currentTimeMillis()));
            stepExecution.setStatus(BatchStatus.FAILED);
            stepExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(stepExecution);

       JobExecution jobExecution =  stepExecution.getJobExecution();
            jobExecution.setEndTime(new Date(System.currentTimeMillis()));
            jobExecution.setStatus(BatchStatus.FAILED);
            jobExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(jobExecution);
        jobOperator.restart(execution.getId());
        
        return new ResponseEntity<String>("<h1> Batch Status Updated !! </h1>", HttpStatus.OK);
    }

Here i have used restApi endpoint to pass the jobExecutionId and stepExecutionId and setting the status of both job_execution and step_execution to FAIL. then restart using batch operator.

Plasmosome answered 20/8, 2020 at 17:47 Comment(2)
Is there any way to restart job/step the where it left off ?Amias
in that case you need to find jobExecutionId and StepExecutionId based on the Failed Step from the DB. then can restart without providing jobId or stepId in the Request Param.Plasmosome

© 2022 - 2024 — McMap. All rights reserved.