How to safely pass params from Tasklet to step when running parallel jobs
Asked Answered
E

1

12

I am trying to pass safely params from tasklet to a step in the same job.

My job consist 3 tasklets(step1,step2,step3) one after another and in the end a step4(processor,reader,writer)

this job is being executed many times in parallel.

In step1 inside the tasklet I am evaluating param(hashId) via web service) than I am passing it all over my chain till my reader (which on step 4)

In step 3 I am creating new param called: filePath which is based on hashid and I send it over to step4(the reader) as a file resource location

I am using stepExecution to pass this param(hashId and filePath).

I tried 3 ways doing it via the tasklet:

to pass the param(hashId from step1 into step2 and from step2 into step 3) I am doing this:

chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("hashId", hashId);

In step4 I am populating filePath based on hashId and pass it this way to my last step(which is reader processor and a writer)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..

    @Override
     public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext    
     executionContext) throws IOException {

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");

          ...

filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys

        chunkContext.getStepContext()
        .getStepExecution()
        .getExecutionContext()
        .put("filePath", filePath);
    } 

logger.info("filePath + "for hashId=" + hashId);

}
@Override
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

Pay attention that I am printing hashId and filePath values right before I am finished that step(step3). by the logs they are consistent and populated as expected

I also added logs within my reader to see log the params that I get.

@Bean
    @StepScope
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
              logger.info("test filePath="+filePath+");

        return itemReader;
    }

When I execute this job ~10 times I can see that the param filePath value is populated with other jobs filePath values when executing in parallel

This is how I promote the job's keys with executionContextPromotionListener:

job definition:

 @Bean
    public Job processFileJob() throws Exception {
        return this.jobs.get("processFileJob").
                start.(step1).
                next(step2)
                next(downloadFileTaskletStep()). //step3
                next(processSnidFileStep()).build();  //step4

    }

step 3 definition

  public Step downloadFileTaskletStep() {
        return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
    }


  @Bean
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
        ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
        executionContextPromotionListener.setKeys(new String[]{"filePath"});
        return executionContextPromotionListener;
    }

Same results threads messing the params

I can track the results via spring batch database table: batch_job_execution_context.short_context:

here you can see the the filePatch which built by the hashid is not identical to the origin hashId //incorrect record///

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv"]},{"string":["hashId","20df39d201fffc7444423cfdf2f43789"]}]}]}

Now if we check other records they seems good. but always one or two messed up

//correct records

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}   

Any idea why I have those Threading issues?

Epicurus answered 21/4, 2015 at 15:40 Comment(11)
How exactly are you running your "job" concurrently ? Each job will have seperate context, therefore params will be unique per each job execution.Sartin
I am just executing the same job many times at once via the jobLauncher. The initial params are safe. but I create new param(filePath) inside a tasklet and I must send it to my reader which on the next step. and that value is inconsistent when I execute many jobs in parallel. like not a thread-safeEpicurus
just out of curiosity try @Bean @Scope("prototype")Sartin
cannot find symbol @Scope("prototype"). btw: using Spring-bootEpicurus
Inside the tasklet I am actually printing the right params to make sure. but when I am printing inside the reader step (while running manys job in paralell) within results you can see that the reader got filePath value twice meaning the param got messed up with other threadEpicurus
yeah, because it's a singleton, not prototype and it's being reused, how about adding step listener to set that value beforeStepSartin
I tried that also. I promoted the keys(filePath) and than used @beforestep. Again I can see the params being passed but when running in paralell it's messed up with other threads(probably)Epicurus
Edited my question with the implEpicurus
Can you try work-around the problem using filePath as a jobParameters? in this way filePath param should be spanned across steps safelyCheeseparing
That would be great but how do you pass param from Tasklet into Step using the jobParameters?? I tried it. never managed. could you please answer this question with an example how to do so?Epicurus
as Michael Minella, they are immutable you cant set them during a job. and I am setting this value only during my job chain. I cant know it from the beginning i can split the job into two jobs and than set it in the second part.. but thats bad designEpicurus
O
2

To review your attempted methods:

  • Method 1 - Editing JobParameters JobParameters are immutable in a job so attempting to modify them during job execution should not be attempted.
  • Method 2 - Editing JobParameters v2 Method 2 is really the same as method 1, you're only going at getting the reference to the JobParameters a different way.
  • Method 3 - Using the ExecutionContextPromotionListener. This is the correct way, but you're doing things incorrectly. The ExecutionContextPromotionListener looks at the step's ExecutionContext and copies the keys you specify over to the job's ExecutionContext. You're adding the keys directly to the Job ExecutionContext which is a bad idea.

So in short, Method 3 is the closest to correct, but you should be adding the properties you want to share to the step's ExecutionContext and then configure the ExecutionContextPromotionListener to promote the appropriate keys to the Job's ExecutionContext.

The code would be updated as follows:

chunkContext.getStepContext()
            .getStepExecution()
            .getExecutionContext()
            .put("filePath", filePath);
Oilbird answered 21/4, 2015 at 18:47 Comment(7)
@Thanks for giving me a lead! just one clarification: "you should be adding the properties you want to share to the step's ExecutionContext.." Not sure I understood how to implement your suggestion.. I config the ExecutionContextPromotionListener with the keys and added it as a listener to the downloadFileTaskletStep. How would you modify that implementation ? thank you!!Epicurus
So I should use the chunkContext and not the stepExecution which I Initialised at beforeStepEpicurus
Hi Michael, I tried it. I executed that jobs 14 times at once and I can see one record inconsistent. meaning that inside the tasklet I am printing hashid=x and when I am getting to the step(within the same job) I am printing hashid=y (of other job execution). I can track that via the database table: batch_job_execution_context and coulmn short_contextEpicurus
Hi Michael, wonder if you looked at my edited question. I provided more details. This bug is really stopping our production deployment. any ideas if thats problem within the framework or I miss anything? thank you.Epicurus
Do you have a github project or gist that illustrates the issue I can look at?Oilbird
I added gist for you with the relevant files. let me know if farther explanations needed: gist.github.com/IdanFridman/f35249dd4299518ffb93Epicurus
I was hoping for an executable gist so I could test. However, one thing I notice is that your steps don't have @Bean on them. That means that there are no bean definitions for your steps. Given that Spring Batch uses a few BeanPostProcessors to handle certain functionality, I wouldn't recommend this approach.Oilbird

© 2022 - 2024 — McMap. All rights reserved.