I am trying to pass safely params from tasklet to a step in the same job.
My job consist 3 tasklets(step1,step2,step3) one after another and in the end a step4(processor,reader,writer)
this job is being executed many times in parallel.
In step1 inside the tasklet I am evaluating param(hashId) via web service) than I am passing it all over my chain till my reader (which on step 4)
In step 3 I am creating new param called: filePath which is based on hashid and I send it over to step4(the reader) as a file resource location
I am using stepExecution to pass this param(hashId and filePath).
I tried 3 ways doing it via the tasklet:
to pass the param(hashId from step1 into step2 and from step2 into step 3) I am doing this:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("hashId", hashId);
In step4 I am populating filePath based on hashId and pass it this way to my last step(which is reader processor and a writer)
public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..
@Override
public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext
executionContext) throws IOException {
String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");
...
filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
}
logger.info("filePath + "for hashId=" + hashId);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
Pay attention that I am printing hashId and filePath values right before I am finished that step(step3). by the logs they are consistent and populated as expected
I also added logs within my reader to see log the params that I get.
@Bean
@StepScope
public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
logger.info("test filePath="+filePath+");
return itemReader;
}
When I execute this job ~10 times I can see that the param filePath value is populated with other jobs filePath values when executing in parallel
This is how I promote the job's keys with executionContextPromotionListener:
job definition:
@Bean
public Job processFileJob() throws Exception {
return this.jobs.get("processFileJob").
start.(step1).
next(step2)
next(downloadFileTaskletStep()). //step3
next(processSnidFileStep()).build(); //step4
}
step 3 definition
public Step downloadFileTaskletStep() {
return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
}
@Bean
public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"filePath"});
return executionContextPromotionListener;
}
Same results threads messing the params
I can track the results via spring batch database table: batch_job_execution_context.short_context:
here you can see the the filePatch which built by the hashid is not identical to the origin hashId //incorrect record///
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]},{"string":["hashId","20df39d201fffc7444423cfdf2f43789"]}]}]}
Now if we check other records they seems good. but always one or two messed up
//correct records
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}
Any idea why I have those Threading issues?
@Bean @Scope("prototype")
– SartinfilePath
as ajobParameter
s? in this way filePath param should be spanned across steps safely – Cheeseparing