I have followed the spring batch doc and couldn't get my job running Asynchronously.
So I am running the Job from a web container and the job will be triggered via a REST end point.
I wanted to get the JobInstance ID to pass it in response before completing the whole job. So they can check the status of the job later with the JobInstance ID instead of waiting. But I couldn't get it work. Below is the sample code I tried. Please let me know what am I missing or wrong.
BatchConfig to make Async JobLauncher
@Configuration
public class BatchConfig {
@Autowired
JobRepository jobRepository;
@Bean
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
Controller
@Autowired
JobLauncher jobLauncher;
@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().
addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
System.out.println(jobExecution.getJobInstance().getInstanceId());
System.out.println("OK RESPONSE");
return jobExecution.getJobInstance().getInstanceId();
}
And JobBuilder as component
@Component
public class BatchComponent {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
public Job customJob(String someParam) throws Exception {
return jobBuilderFactory.get("personProcessor")
.incrementer(new RunIdIncrementer()).listener(listener())
.flow(personPorcessStep(someParam)).end().build();
}
private Step personPorcessStep(String someParam) throws Exception {
return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
.reader(new PersonReader(someParam)).faultTolerant().
skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
.writer(new PersonWriter()).build();
}
private JobExecutionListener listener() {
return new PersonJobCompletionListener();
}
private class PersonInput {
String firstName;
public PersonInput(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
private class PersonOutput {
String firstName;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
public class PersonReader implements ItemReader<PersonInput> {
private List<PersonInput> items;
private int count = 0;
public PersonReader(String someParam) throws InterruptedException {
Thread.sleep(10000L); //to simulate processing
//manipulate and provide data in the read method
//just for testing i have given some dummy example
items = new ArrayList<PersonInput>();
PersonInput pi = new PersonInput("john");
items.add(pi);
}
@Override
public PersonInput read() {
if (count < items.size()) {
return items.get(count++);
}
return null;
}
}
public class DataDuplicateSkipper implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
if (exception instanceof DataIntegrityViolationException) {
return true;
}
return true;
}
}
private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {
@Override
public PersonOutput process(PersonInput item) throws Exception {
return null;
}
}
private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
@Override
public void write(List<? extends PersonOutput> results) throws Exception {
return;
}
}
private class PersonJobCompletionListener implements JobExecutionListener {
public PersonJobCompletionListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JOB COMPLETED");
}
}
}
Main Function
@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
I am using annotation based configurations and use gradle with the below batch package.
compile('org.springframework.boot:spring-boot-starter-batch')
Please let me know if some more info needed. I couldn't find any example to run this common use case.
Thanks for you time.
couldn't get my job running Asynchronously
? – Tungstatenot
getting Job Instance ID immediately in the controller response, so that it can be used later to check the status of the job. But here the statement is ran only after the job completed fully. @MahmoudBenHassine – HyperbaricJobLauncher
(the one configured with a asynchronous task executor) is injected in your controller. Probably you have another jobLauncher with a synchronous task executor that runs the job until completion before returning the job execution. – TungstateQualifier
but still the same, I get ID only when the job completes. Here is what I get in IDE console when I hit the controller – Hyperbaric2 OK RESPONSE 2018-12-10 20:04:58.726 INFO 82585 --- [cTaskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=personProcessor]] launched with the following parameters: [{time=1544452488708}] 2018-12-10 20:04:58.740 INFO 82585 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [personProcessStep] JOB COMPLETED
– Hyperbaric2018-12-10 20:04:58.754 INFO 82585 --- [cTaskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=personProcessor]] completed with the following parameters: [{time=1544452488708}] and the following status: [COMPLETED]
– Hyperbaric