본문 바로가기
Spring|Spring-boot/Spring Batch

Batch 성능 개선

by oncerun 2023. 5. 29.
반응형

 

https://github.com/sungil-yu/exampleBatch/commit/777ca90ddd160d174c14f2474c4b30de8bfc22b0

 

order statistics & decider · sungil-yu/exampleBatch@777ca90

Show file tree Showing 10 changed files with 241 additions and 1,254 deletions.

github.com

 

다음 코드를 기반으로 성능을 개선하는 포인트를 알아본다. 

 

 

Test를 위한 데이터를 만드는 SaveUserTasklet에서 더 많은 유저를 저장하도록 코드를 변경한다.

 

@Bean
public Job userJob() throws Exception {
    return this.jobBuilderFactory.get("userJob")
            .incrementer(new RunIdIncrementer())
            .start(this.saveUserStep())
            .next(this.userLevelUpStep())
            .listener(new LevelUpJobExecutionListener(userRepository))
            .next(new JobParametersDecide("date"))
            .on(JobParametersDecide.CONTINUE.getName())
            .to(this.orderStatisticsStep(null))
            .build()
            .build();
}

 

 

현재 Job 설정이다.

 

사실 서로  관련이 없는 부분은 userLevelUpStep()과 orderStatisticsStep()이다. 

 

userLevelUpStep은 테스트 데이터를 읽어 유저의 등급을 올리는 STEP이며, orderStatisticsStep은 테스트 데이터의 일 매출을 통계하여 파일로 추출하는 STEP이기 때문에 병렬처리되어도 아무런 문제가 없다.

 

또한 실제 이는 별도의 배치 Job으로 나누는 것이 맞다.  추후 실제 배치 프로그램을 개발할 때는 책임이 다른 경우 별도의 Job으로 처리할 수 있도록 하자.

 

우선 5만 명의 테스트 유저를 생성한다. 이후 ChunkSize는 1천으로 실행한다.

 

여러 Step을 알아보고 각 성능을 확인하려고 한다.

 

배치의 실행 시간은 다음 리스너의 다음 코드로 확인한다.

long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();

 

 

현재 코드기반의 시간은 42초가 걸렸다.

 

 

Async Processing

 

이는 ItemProcessor와 ItemWriter를 비동기로 실행할 수 있다.

 

이는 Future 기반의 비동기로 spring-batch-integration이 종속성이 필요하다.

implementation 'org.springframework.batch:spring-batch-integration'

 

비동기로 사용할 TaskExecutor는 스프링이 제공하는 기본으로 사용하는 것이 아닌 별도로 정의해서 사용한다.

 

이는 서버의 사양에 따라 적절하게 조절해야 한다.

 

나는 기본 10개의 스레드를 유지하고 최대 20개까지 늘어날 수 있도록 설정했다. 

@Bean
@Primary
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setThreadNamePrefix("batch-thread-");
    taskExecutor.initialize();
    return taskExecutor;
}

 

기본적으로 구성은 매우 간단한데, 기존 ItemProcessor, ItemWriter를 비동기 클래스에 위임하도록 설정만 진행하면 된다.

private AsyncItemProcessor<User,User> itemProcessor() {
    ItemProcessor<User, User> itemProcessor = user -> {
        if (user.availableLevelUp()){
            return user;
        }
        return null;
    };

    AsyncItemProcessor<User, User> asyncItemProcessor = new AsyncItemProcessor<>();
    asyncItemProcessor.setDelegate(itemProcessor);
    asyncItemProcessor.setTaskExecutor(this.taskExecutor);
    return asyncItemProcessor;
}

 

https://github.com/sungil-yu/exampleBatch/commit/accf454d6bc66df16c325c91ecd5246b56455fa7

 

async processing · sungil-yu/exampleBatch@accf454

Show file tree Showing 11 changed files with 248 additions and 23 deletions.

github.com

 

실행 시간은 41초 정도가 걸렸다.

 

 

 

Multi Thread

 

 

Async Step은 ItemProcessor와 ItemWriter만 비동기로 처리했지만 Multi-Thread Step은 Chunk 단위로 멀티 스레딩 처리를 한다.  그렇기에 ItemReader에 대해 Thread-safe 하게 코드를 변경해주어야 한다.

 

paging 기반 itemReader는 thread-safe하기 때문에 이를 사용하는 편이다.

 

이 작업은 멀티 스레드를 적용할 step에 다음과 같은 설정으로 처리할 수 있다.

.taskExecutor(taskExecutor)
.throttleLimit(8)

 

throttleLimit의 설명은 다음과 같다.

 

In the case of an asynchronous taskExecutor(TaskExecutor) the number of concurrent tasklet executions can be throttled (beyond any throttling provided by a thread pool). The throttle limit should be less than the data source pool size used in the job repository for this step.

 

배치에 사용되는 repository의 data source pool size보다 반드시 작아야 하고, 이는 taskExecutro에서 설정한 값보다 더 크게 조절될 수 있다고 합니다.

 

시간은 32초로 상당히 많이 줄었다. 

 

 

Partition Step

 

Async processing은 itemwriter와 itemprocessor가 비동기로 실행되고,

Multi Thread는 chunk 단위로 멀티 스레딩 처리를 하는 반면,

Partition Step은 Master Step을 생성하여 하위 여러 개의 Slave Step을 생성하여 step 기준으로 multi-thread를 처리한다.

 

즉 item이 10만개가 있다고 가정한고,  slave step 10개로 처리하고 싶다면,

10만/10, 각 1만 개씩 나누어서 각 멀티 스레딩 처리한다. 

 

이를 위해선 Partitioner 인터페이스를 구현하는 별도의 클래스가 필요하다.

public interface Partitioner {

   /**
    * Create a set of distinct {@link ExecutionContext} instances together with
    * a unique identifier for each one. The identifiers should be short,
    * mnemonic values, and only have to be unique within the return value (e.g.
    * use an incrementer).
    * 
    * @param gridSize the size of the map to return
    * @return a map from identifier to input parameters
    */
   Map<String, ExecutionContext> partition(int gridSize);

}

 

partition 메소드에서 각 slave step에서 사용하게 될 ExecutionContext을 정의하여 리턴하면 이를 각 slave step에서 사용한다.

 

예를 들면 5만 개의 아이템을 처리하기 위해 최대 최소의 분할 값을 가져온 후 gridSize(partitionSize)로 나누어 각 step마다 처리하게 될 아이템 사이즈를 구한 후 ExecutionContext에 설정하여 Map으로 반환한다.

 

여기서 알아두어야 할 점은 ExecutionContext를 사용하기 위해선 StepScope로 설정되어야 하고, StepScope로 설정되기 위해선 Bean으로 등록되어야 한다.

 

그렇다면 ExecutionContext에 저장된 데이터에 접근해서 각 step의 범위를 나누어주어야 하는데, 어떻게 접근해야 하는지 모를 수 있다. 

이는 Spring EL을 활용하면 파라미터 주입이 가능하다. 

@Bean
@StepScope
public ItemReader<? extends User> itemReader(
    @Value("#{stepExecutionContext[minId]}") Long minId, 
    @Value("#{stepExecutionContext[maxId]}") Long maxId) throws Exception {...}

 

파라미터에 Spring EL을 통해 데이터를 주입받도록 메소드메서드 시그니처가 변경되는 이를 사용하는 step에서 컴파일 오류가 발생할 수 있는데, 이때는 null을 넣어주어도 메서드 실행 전 spring el을 통해 데이터를 주입받을 수 있어 문제가 없다.

 

Partition Step을 설정하는 것은 꽤나 복잡한데 이에 대한 코드는 다음과 같다.

https://github.com/sungil-yu/exampleBatch/commit/182c6d7c80cf83618a207bc3effb879bf0d1e979

 

partition step · sungil-yu/exampleBatch@182c6d7

Show file tree Showing 3 changed files with 277 additions and 0 deletions.

github.com

 

시간은 26초로 좋은 성능을 보여준다.

 

사실 대부분의 시간은 테스트 데이터를 생성하는 부분에 있어 23초정도가 걸렸고, 이는 실제 모든 배치를 처리하는데 약 3초 정도가 걸린다는 이야기다. 

 

아이템을 읽는 부분을 파티셔닝하였다면 기존 Async Processing을 적용하여 ItemProcessor와 ItemWriter까지 비동기로 처리하면 완벽하게 비동기로 수행되는 배치프로그램이 된다. 

 

 

Parallel Step

 

 

말 그대로 하나의 Job에 여러 개의 step이 존재했을 때 각 step이 상호관계가 없는 경우 병렬로 실행할 수 있다. 

 

여기선 FlowStep을 이용한다.

 

병렬 처리 기준으로 각 step을 나눈뒤 이를 Flow로 사용하는 것인데, 병렬 처리할 step을 다음과 같이 추가하여 병렬로 실행하도록 할 수 있다.

 

return new FlowBuilder<Flow>(JOB_NAME + "_splitFlow")
        .split(this.taskExecutor)
        .add(userLevelUpFlow(), orderStatisticsFlow(date))
        .build();

 

 

다음과 같이 구성해보자.

 

A step과 B, C step을 Flow로 구성한다.  즉 Parallel Step을 적용한다.

 

이로 인해 B, C step은 병렬로 실행된다. 

 

이후 Parallel Step의 B, C를 묶은 Flow에서 B step에 Partition Step을 적용하여 gridSize 별로 스텝을 나누어 멀티 스레딩 처리를 한다. 

 

각 스레드가 읽은 grid Items 들을 이제 processor, itemwriter 순으로 처리될 것인데, 이 또한 비동기로 처리하도록 감싼다.

 

이를 처리했을 때 사실 큰 시간차이를 보이지 않고 동일한 26초가 나왔다. 이를 위해선 정확히 배치가 사용하는 스레드를 계산하고 이를 스레드풀의 적절한 설정을 해주어야 할 것 같다. 

 

 

@Bean(JOB_NAME + "_splitFlow")
@JobScope
public Flow splitFlow(@Value("#{jobParameters[date]}") String date) throws Exception {
    return new FlowBuilder<Flow>(JOB_NAME + "_splitFlow")
            .split(this.taskExecutor)
            .add(userLevelUpFlow(), orderStatisticsFlow(date))
            .build();
}

 

스레드 사용량을 대략적으로 유추하고, 이를 설정하여 다시 실행했더니 조금 더 줄었다. 

 

saveuser bacth time : 23 s68 ms  ,  batch time : 25 s848 ms, total data processed : 40000  level up user

근데, 순간 메모리 사용률이 1.5GB 정도까지 확 올라갔는데, 너무 많은 자원을 쓰는 거 아닌가 싶다.

 

성능 테스트는 역시 환경에 맞게 여러 번 시간을 두고 테스트하는 게 맞기 때문에 각 방법이 최고의 선택이라고 절대 말할 수 없다. 

 

다만 다양한 방법을 익혔고, 만약 필요하다면 위 코드를 확인하고 필요한 상황에 적합한 방법을 적용해보고 이를 반복하여 좋은 성능을 가지도록 개선하도록 해야겠다. 

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형

'Spring|Spring-boot > Spring Batch' 카테고리의 다른 글

Spring Batch Test Code  (0) 2023.05.28
Batch 예제  (0) 2023.05.24
Spring Batch 아키텍처  (0) 2023.05.21
Spring Batch about 15 minutes  (0) 2023.05.14
Batch란  (1) 2023.05.14

댓글