How to launch Spring Batch Job Asynchronously
Asked Answered
H

8

9

I have followed the spring batch doc and couldn't get my job running Asynchronously.

So I am running the Job from a web container and the job will be triggered via a REST end point.

I wanted to get the JobInstance ID to pass it in response before completing the whole job. So they can check the status of the job later with the JobInstance ID instead of waiting. But I couldn't get it work. Below is the sample code I tried. Please let me know what am I missing or wrong.

BatchConfig to make Async JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;


    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

Controller

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}

And JobBuilder as component

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }


    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }


    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }


    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }


    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}

Main Function

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

I am using annotation based configurations and use gradle with the below batch package.

compile('org.springframework.boot:spring-boot-starter-batch')

Please let me know if some more info needed. I couldn't find any example to run this common use case.

Thanks for you time.

Hyperbaric answered 8/12, 2018 at 23:12 Comment(8)
You get any exception ?Stronghold
No I don't get any Exception @TinyOSHyperbaric
your configuration seems correct to me. Since you configured a asynchronous task executor in your jobLauncher, it should return immediately the job execution and run the job in a separate thread. Can you elaborate on couldn't get my job running Asynchronously?Tungstate
I am not getting Job Instance ID immediately in the controller response, so that it can be used later to check the status of the job. But here the statement is ran only after the job completed fully. @MahmoudBenHassineHyperbaric
In this case, make sure the correct JobLauncher (the one configured with a asynchronous task executor) is injected in your controller. Probably you have another jobLauncher with a synchronous task executor that runs the job until completion before returning the job execution.Tungstate
@MahmoudBenHassine thanks. After your suggestion I used the bean with Qualifier but still the same, I get ID only when the job completes. Here is what I get in IDE console when I hit the controllerHyperbaric
2 OK RESPONSE 2018-12-10 20:04:58.726 INFO 82585 --- [cTaskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=personProcessor]] launched with the following parameters: [{time=1544452488708}] 2018-12-10 20:04:58.740 INFO 82585 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [personProcessStep] JOB COMPLETED Hyperbaric
2018-12-10 20:04:58.754 INFO 82585 --- [cTaskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=personProcessor]] completed with the following parameters: [{time=1544452488708}] and the following status: [COMPLETED]Hyperbaric
B
16

Try this,In your Configuration You need to create customJobLauncher with SimpleAsyncTaskExecutor using the @Bean(name = "myJobLauncher") and same will be used @Qualifier in your controller.

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

In your Controller

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
Branching answered 23/3, 2019 at 11:4 Comment(1)
Please don't write code-only answers, try to elaborate by explaining what you're doing.Proliferate
L
6

If I look at your code I see a couple of mistake. First of all your custom config is not loaded, because, if it was, the injection will fail for duplicate bean instance for the same interface.

There's a lot of magic in spring boot, but if you don't tell him to do some component scan, nothing will be loaded as espected.

The second problem that i can see is your BatchConfig class: it does not extends DefaultBatchConfigure, nor overrides getJobLauncher(), so even if the boot magic will load everything you'll get the default one. Here is a configuration that will work and it's compliant with the documentation @EnableBatchProcessing API

BatchConfig

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {

  @Override
  @Bean
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;

    } catch (Exception e) {
      log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
      return super.getJobLauncher();
    }
  }
}

Main Function

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}
Leverrier answered 19/9, 2019 at 16:34 Comment(2)
@Slf4j it's a Lombok annotation, you can ignore it anche inject Logger log manuallyLeverrier
This creates a circular dependency between the BatchConfig file and ModularBatchConfiguration from spring batch. To make it work just remove modular=true from the annotation in BatchConfigSivas
S
2

Although you’ve your custom jobLauncher, you’re running the job using default jobLauncher provided by Spring. Could you please autowire simpleJobLauncher in your controller and give it a try?

Steiner answered 5/3, 2019 at 10:36 Comment(1)
I agree, his only problem is he was autowiring the default jobLauncher instead of the simpleJobLauncher by name he created a bean for.Lilongwe
M
2

I know that this is an old question but I post this answer anyway for future users.

After reviewing your code I can't tell why you have this problem, but I can suggest you to use a Qualifier annotation plus use the ThreadPoolTaskExecutor like so and see if it solve your problem.

You may also check this tutorial: Asynchronous Spring Batch Job Processing for more details. It will help you configure a spring batch job asynchronously. This tutorial was written by me.

@Configuration
public class BatchConfig {
 
 @Autowired
 private JobRepository jobRepository;
 
 @Bean
 public TaskExecutor threadPoolTaskExecutor(){
  
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);
  
   return executor;
 }
 
 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}
Mythologize answered 26/9, 2019 at 19:3 Comment(0)
B
1

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);. Joblauncher will wait after the Job has been completed before returning anything, that why your service is probably taking long to respond if that is your problem. If you want asynchronous capabilities, you might want to look at Spring's @EnableAsync & @Async.

@EnableAsync

Binni answered 9/12, 2018 at 9:34 Comment(3)
@EnableAsync is put on application level already in my main function. I need to return job Id in response as well. How to do that?Hyperbaric
Alright, my apologies i didnt catch that. You seem like you already have the right setup, except for annotating your workHard() method with @Async as well. SimpleAsyncTaskExecutor() should also instantly return the JobExecution with ExitStatus=Unknown as stated hereBinni
If I put @Async in the controller method workHard(). I am not at all getting Job Instance Id but the request is finished instantly with empty response.Hyperbaric
C
1

According to spring documentation to return a response of the http request asynchronous it is required to use org.springframework.core.task.SimpleAsyncTaskExecutor.

Any implementation of the spring TaskExecutor interface can be used to control how jobs are asynchronously executed.

spring batch documentation

<bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

Corollary answered 29/8, 2019 at 8:21 Comment(0)
B
0

If you're using Lombok this might help you:

TLDR: Lombok @AllArgsConstructor doesn't seem to work well with @Qualifier annotation EDIT: if you have enable @Qualifier annotations in the lombok.config file to be able to use @Qualifier with @AllArgsConstructor like this:

lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

I know old question, however I had the exact same problem and none of the answers solved it.

I configured the async job launcher like this and added the qualifier to make sure this jobLauncher is injected:

 @Bean(name = "asyncJobLauncher")
 public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

And injected it like this

@Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;

I was using Lombok @AllArgsConstructor after changing it to autowire, the correct job launcher got injected and the job is now executed asynchronously:

@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;

Also I didn't had to extend my configuration from DefaultBatchConfigurer

Breechblock answered 11/2, 2021 at 16:47 Comment(1)
I am using Spring Batch 5 ,when i am trying to run the job Asynchronously, i am getting the given below error . "PreparedStatementCallback; uncategorized SQLException for SQL [INSERT INTO BATCH_JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)" VALUES (?, ?, ?, ?) ]; SQL state [72000]; error code [8177]; ORA-08177: can't serialize access for this transactionPeony
B
0

Create a new java file named BatchParallelProcessingConfiguration.java and add the following code,

import javax.sql.DataSource;

import org.springframework.batch.core.configuration.BatchConfigurationException;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Configuration
// @EnableBatchProcessing
public class BatchParallelProcessingConfiguration {

    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor();
    }

    @Bean
    public JobLauncher jobLauncher(){
        TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor());
        return jobLauncher;
    }
}

In order to override the default jobLauncher bean provider by the spring boot we need to add the below property in application.properties.

spring.main.allow-bean-definition-overriding=true

And that's it now your job can be run asynchronously and parallelly.

Bocage answered 21/12, 2023 at 13:7 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.