Spring batch Partitioning with multiple steps in parallel?
Asked Answered
W

4

6

I have implemented spring batch partitioning for a single steps where a master step delegates its work to several slave threads which than gets executed in parallel. As shown in following image.(Reference Spring docs) enter image description here Now what if I have multiple steps which are to be executed in parallel? How to configure them in batch configuration? My current configuration is

 <batch:job id="myJob" restartable="true" job-repository="jobRepository" >
        <batch:listeners>
            <batch:listener ref="myJoblistener"></batch:listener>
        </batch:listeners>

        <batch:step id="my-master-step">
            <batch:partition step="my-step" partitioner="my-step-partitioner" handler="my-partitioner-handler">
            </batch:partition>
        </batch:step>
    </batch:job>

    <batch:step id="my-step" >
        <batch:tasklet ref="myTasklet" transaction-manager="transactionManager" >
        </batch:tasklet>
        <batch:listeners>
            <batch:listener ref="myStepListener"></batch:listener>
        </batch:listeners> 
    </batch:step>

My architecture diagrams should be like following image: enter image description here

I am not sure even if it is possible using spring batch.Any ideas or I am way over my head to implement it.Thank you.

Waller answered 29/4, 2014 at 6:23 Comment(3)
Hi, See this post which outlines how to use the flow and split elem elements. I'm trying to do exactly the same as you but am still stuck with it. javaetmoi.com/2012/12/…Capo
This is a second post i found related to this topic. forum.spring.io/forum/spring-projects/batch/…Capo
I had a similar issue, please take a look at #33121676Inexecution
S
3

You can try the following.

 <batch:job id="myJob" restartable="true" job-repository="jobRepository" >
        <batch:listeners>
            <batch:listener ref="myJoblistener"></batch:listener>
        </batch:listeners>

        <batch:step id="my-master-step">
            <batch:partition step="my-step" partitioner="my-step-partitioner" handler="my-partitioner-handler">
            </batch:partition>
        </batch:step>
    </batch:job>

    <batch:step id="my-step" >
        <batch:job ref="MyChildJob" job-launcher="jobLauncher"
                job-parameters-extractor="jobParametersExtractor" />
        <batch:listeners>
            <batch:listener ref="myStepListener"></batch:listener>
        </batch:listeners> 
    </batch:step>

    <batch:job id="MyChildJob" restartable="false"
        xmlns="http://www.springframework.org/schema/batch">
        <batch:step id="MyChildStep1" next="MyChildStep2">
            <batch:tasklet ref="MyChildStep1Tasklet" transaction-manager="transactionManager" >
            </batch:tasklet>
        </batch:step>

        <batch:step id="MyChildStep2" next="MyChildStep3">
            <batch:tasklet ref="MyChildStep2Tasklet" transaction-manager="transactionManager" >
            </batch:tasklet>
        </batch:step>

        <batch:step id="MyChildStep3">
            <batch:tasklet ref="MyChildStep3Tasklet" transaction-manager="transactionManager" >
            </batch:tasklet>
        </batch:step>

    </batch:job>
Stateless answered 15/8, 2014 at 15:36 Comment(3)
I don't think it would work ...By seeing at your configuration each tasklet is running a different job..This is not desirable ..I need to have a single batch job with above mentioned behavior..thanksWaller
I have configured same way and working fine in production. If you don't need it as separate job, configure it as flow. The flow contain all the three steps.Stateless
Hi @DanglingPiyush.Did you solve your problem.I have same requirement.Can you help meHaematogenesis
E
0

I had similar requirement and solved it using below requirement

<batch:job id="cycleJob">
        <batch:step id="zStep" next="gStep">
            <batch:partition partitioner="zPartitioner">
                <batch:step>
                    <batch:tasklet throttle-limit="1">
                        <batch:chunk processor="itemProcessor" reader="zReader" writer="itemWriter" commit-interval="1">
                        </batch:chunk>
                    </batch:tasklet>
                </batch:step>
                <batch:handler task-executor="taskExecutor" grid-size="${maxThreads}" />
            </batch:partition>
        </batch:step>
        <batch:step id="gStep" parent="zStep" next="yStep">
            <batch:partition partitioner="gPartitioner">
                <batch:step>
                    <batch:tasklet throttle-limit="1">
                        <batch:chunk processor="itemProcessor" reader="gReader" writer="itemWriter" commit-interval="1">
                        </batch:chunk>
                    </batch:tasklet>
                </batch:step>
                <batch:handler task-executor="taskExecutor" grid-size="${maxThreads}" />
            </batch:partition>
        </batch:step>
</batch:job>
Exhaustless answered 8/4, 2016 at 13:50 Comment(0)
C
0

Late answer, but I finally found the solution I was originally looking for when coming here, using a flow instead of a child job. So I figured I should post it here as well.

    <job id="myJob">
        <step id="my-master-step">
            <partition partitioner="my-step-partitioner">
                <handler task-executor="my-partitioner-handler" />
                <step>
                    <!-- For each partition, we run the complete flow -->
                    <flow parent="mainFlow" />
                </step>
            </partition>
        </step>
    </job>
    
    <!-- The flow consists of several sequential steps (2 here) -->
    <flow id="mainFlow">
        <step id="MyChildStep1" next="MyChildStep2">
            <!-- Here you can have a tasklet or a chunk of course -->
            <tasklet ref="MyChildStep1Tasklet" />
        </step>
        <step id="MyChildStep2">
            <!-- Same here -->
            <tasklet ref="MyChildStep2Tasklet" />
        </step>
    </flow>
    
    <bean id="MyChildStep1Tasklet" class="..." />
    
    <bean id="MyChildStep1Tasklet" class="..." />

I have not tested running it in parallel but I see no reason why it shouldn't work.

Coppersmith answered 12/11, 2020 at 13:28 Comment(1)
is there any way to convert the above xml answer to java configuration?Graybeard
V
0

As @arunkumar-pushparaj asked, here you can find a simple example using Java configuration:

public class ParallelFlowConfiguration {
protected static final int GRID_SIZE = 4;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private JobBuilderFactory jobBuilderFactory;


@Bean
public Job producerProcess() {      
    return this.jobBuilderFactory.get("partitionedJob").incrementer(new RunIdIncrementer())
            .start(step1(null,null))                
            .build();

}

@Bean
public Step step1(PartitionHandler partitionHandler,Partitioner  partitioner) {
    return this.stepBuilderFactory.get("step1").partitioner(myStepFlow().getName(), partitioner)
            .step(myStepFlow()).partitionHandler(partitionHandler)
            .build();
}

@Bean
public Step step2() {
    return this.stepBuilderFactory.get("step2").tasklet((contribution, chunkContext) -> {
        ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution()
                .getExecutionContext();

        Integer partitionNumber = executionContext.getInt("partitionNumber");

        
        log.info("Eseguo Step DUE: {}",partitionNumber);
        return RepeatStatus.FINISHED;
    }).build();

}

@Bean
public Partitioner partitioner() {
    return new Partitioner() {
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {

            Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

            for (int i = 0; i < GRID_SIZE; i++) {
                ExecutionContext context1 = new ExecutionContext();
                context1.put("partitionNumber", i);

                partitions.put("partition" + i, context1);
            }

            return partitions;
        }
    };
}

@Bean
public Flow myFlow() {
    return new FlowBuilder<Flow>("myFlow1").start(step2()).build();
}

@Bean
public Step myStepFlow() {
    return stepBuilderFactory.get("stepFlow")
            
            .flow(myFlow()).build();
}

@Bean 
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
    partitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
    partitionHandler.setStep(myStepFlow());
    return partitionHandler;
}

}

Vanillic answered 23/1, 2023 at 11:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.