Spring|Spring-boot/Spring Batch

Spring Batch Parameter

oncerun 2021. 2. 28. 12:53
반응형

 

배치를 실행에 필요한 값을 parameter를 통해 외부에서 주입

JobParameters는 외부에서 주입된 parameter를 관리하는 객체

parameter를 JobParameters와 Spring EL로 접근  (두 가지 사용 가능)

 - String parameter = jobParameters.getString(key, defaultValue);

 - @Value("#{jobParameters [key]}")

 

 

우선 jobParameters를 사용해보자. tasklet을 사용할 때이다.

 private Tasklet tasklet(){

        List<String> items = getItems();

        return (contribution, chunkContext) -> {
            StepExecution stepExecution = contribution.getStepExecution();
            JobParameters jobParameters = stepExecution.getJobParameters();

            String value = jobParameters.getString("chunkSize", "10");
            int chunkSize = StringUtils.isNotEmpty(value) ? Integer.parseInt(value) : 10;

            int fromIndex = stepExecution.getReadCount();
            int toIndex = fromIndex +chunkSize;

            if (fromIndex >= items.size()){
                return RepeatStatus.FINISHED;
            }

            List<String> subList = items.subList(fromIndex, toIndex);
            log.info("task item size : {}" , subList.size());

            stepExecution.setReadCount(toIndex);
            return RepeatStatus.CONTINUABLE;
        };

stepExecution에서 jobParameters객체를 얻어온 다음 설정을 해준다. 

그리고 chunkSize를 파라미터에서 읽어오도록 했습니다. size는  20으로 했습니다.

위 코드는 tasklet을 chunk처럼 분할하는 방식으로 만든 코드입니다. 

 

 

다음 방식은 Spring EL을 사용하는 방식입니다.

 

@Bean
    @JobScope
    public Step chunkBaseStep(@Value("#{jobParameters[chunkSize]}") String chunkSize){
        return  stepBuilderFactory.get("chunkBaseStep")
                .<String,String>chunk(StringUtils.isNotEmpty(chunkSize) ? Integer.parseInt(chunkSize) : 10)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();

    }

Lombok이 아닌 Spring Value어노테이션을 사용해 jobParameters의 값을 전달받아 사용하는 방식입니다.

 

@Scope는 어떤 시점에 bean을 생성/소멸시킬지 bean의 lifecycle을 설정합니다. 

 

여기서 사용한 @JobScope는 job실행 시점에 생성/소멸됩니다. 

 -step에 선언합니다.

 

@StepScope는 step 실행 시점에 생성/소멸

만약 stepScope을 사용하기 위해선 ItemReader, ItemProcessor, ItemWriter를 Bean으로 등록해주어야 한다.

 

  - Tasklet, Chunk(ItemReader, ItemProcessor, ItemWriter)에 선언

 

spring의 @Scope와 같다.

 ex) @Spring("job") = @JobScope

 

Job과 step 라이프사이클에 생성되기 때문에 Thread safe 하게 작동

 

 

왜 Bean을 scope로 관리하는가?

 

여러 step에서 하나에 tasklet을 동시에 실행한다면 Thread safe 하게 동작하지 않습니다.

tasklet의 stepScope설정을 하게 된다면 여러 step이 실행할 때마다 새로운 tasklet을 생성하기 때문에 Thread safe 하게 동작할 수 있습니다. spring의 scope 기본값이 싱글톤이니까?

EL을 사용하기 위해서도 있습니다.

 

@Slf4j
@Configuration
public class ChunkProcessConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;


    public ChunkProcessConfiguration(JobBuilderFactory jobBuilderFactory,
                           StepBuilderFactory stepBuilderFactory){
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public Job chunkProcessingJob(){
        return jobBuilderFactory.get("chunkProcessingJob")
                .incrementer(new RunIdIncrementer())
                .start(this.taskBaseStep())
                .next(this.chunkBaseStep(null))  //null도 정상동작 jobScope설정 덕분
                .build();

    }

    @Bean
    @JobScope
    public Step chunkBaseStep(@Value("#{jobParameters[chunkSize]}") String chunkSize){
        return  stepBuilderFactory.get("chunkBaseStep")
                .<String,String>chunk(StringUtils.isNotEmpty(chunkSize) ? Integer.parseInt(chunkSize) : 10)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();

    }

    private ItemWriter<String> itemWriter() {
        return items -> log.info("chunk item size : {}", items.size());

//        return items -> items.forEach(log::info);
    }

    //item을 가공하거나, writer로 넘길지 말지 결정 null일경우 writer로 넘어가지 않음
    private ItemProcessor<String, String> itemProcessor() {
        return item -> item + ", Spring Batch";
    }

    private ItemReader<String> itemReader() {
        return new ListItemReader<>(getItems()); //100개 String list
    }

    @Bean
    public Step taskBaseStep() {
        return stepBuilderFactory.get("taskBaseStep")
                .tasklet(this.tasklet(null)) //tasklet으로 100개 한번에 처리
                .build();

    }

   /* private Tasklet tasklet() {
        return (contribution, chunkContext) -> {
            List<String> items = getItems();
            log.info("task item size : {}",
                    items.size());
            return RepeatStatus.FINISHED;
        };
    }*/

    @Bean
    @StepScope
    public Tasklet tasklet(@Value("#{jobParameters[chunkSize]}") String value){

        List<String> items = getItems();

        return (contribution, chunkContext) -> {
            StepExecution stepExecution = contribution.getStepExecution();
            JobParameters jobParameters = stepExecution.getJobParameters();



//            String value = jobParameters.getString("chunkSize", "10");
            int chunkSize = StringUtils.isNotEmpty(value) ? Integer.parseInt(value) : 10;

            int fromIndex = stepExecution.getReadCount();
            int toIndex = fromIndex +chunkSize;

            if (fromIndex >= items.size()){
                return RepeatStatus.FINISHED;
            }

            List<String> subList = items.subList(fromIndex, toIndex);
            log.info("task item size : {}" , subList.size());

            stepExecution.setReadCount(toIndex);
            return RepeatStatus.CONTINUABLE;
        };

    }



    private List<String> getItems() {
        List<String> items  = new ArrayList<>();

        for (int i =0; i < 100; i++){
            items.add(i + "hello");
        }
        return items;
    }

}
반응형