https://spring.io/guides/gs/batch-processing/
스프링 배치 아키텍처를 살펴보기 전에 15분 걸린다는 간단한 스프링 배치 실습을 진행해 보자.
준비물은 좋아하는 IDE와 자바 17 그 이상과 그래들 7.5 이상, 메이븐 3.5 이상이니까
자바 17을 먼저 다운로드 받자.
https://www.oracle.com/java/technologies/javase/jdk17-archive-downloads.html
이걸로 소스 받고 initial로 이동하고 클래스 만들기로 이동하라고 한다.
.git clone https://github.com/spring-guides/gs-batch-processing.git
고객이나 비즈니스 분석가가 스프레드시트를 제공한다. 이 간단한 경우 다음과 같은 csv 파일을 준다고 가정한다.
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
이 시트에는 각 행에 쉼표로 구분된 이름과 성이 포함되어 있고, 사용자 지정 없이 처리할 수 있는 일반적인 패턴이다.
데이터를 저장할 SQL 스크립트를 작성한다.
DROP TABLE people IF EXISTS;
CREATE TABLE people (
person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);
HyperSQL 데이터베이스를 사용하라는데 이는 자바로 구성된 관계형 데이터베이스 관리 시스템이라고 한다.
HDB랑 비슷한 거라고 한다.
다음은 CSV 파일에 있는 내용을 담을 비즈니스 클래스를 만듭니다.
package com.example.batchprocessing;
public class Person {
private String lastName;
private String firstName;
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getLastName() {
return lastName;
}
public String getFirstName() {
return firstName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}
}
이제 Intermediate Processor를 만듭니다.
배치의 일반적인 패러다임은 데이터를 수집하고 변환한 다음 다른 곳으로 이동시키는 것.
여기서는 이름을 대문자로 변환하는 간단한 변환기를 작성한다고 한다.
PersonItemProcessor를 만들어보자.
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(Person item) throws Exception {
return null;
}
}
process 메서드를 오버라이드 하여 사용하도록 구성되어 있나 보다.
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
return transformedPerson;
}
}
이제 실제 배치 작업을 하도록 구성해야 하는데, 많은 유틸리티 클래스를 사용하여 비즈니스 논리에 집중할 수 있다고 한다.
이를 위해 Configuration에 다음과 같은 빈을 등록해야 한다.
package com.example.batchprocessing;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
@Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Person.class);
}}).build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource){
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.build();
}
}
여기서는 Reader와 writer, processor를 정의했다.
reader()는 ItemReader를 생성하는 코드입니다. 이는 csv 파일을 불러 각 라인을 읽고 이를 Person 객체로 변환한다고 합니다.
processor()는 아까 정의한 대문자로 변환하는 프로세서 인스턴스를 만들어 빈으로 등록합니다.
writer(DataSource)는 ItemWriter를 생성하는데, 이는 JDBC를 대상으로 하며, @EnableBatchProcessing에서 생성된 Datasoruce의 복사본을 자동으로 가져온다고 합니다.
@EnableBatchProcessing 애너테이션은 Spring Batch의 기능을 활성화하고 Spring Batch에 필요한 인프라스트럭처를 설정하는 역할을 합니다. 이 애너테이션을 사용하면 Spring Batch에서 필요한 중요한 빈들을 자동으로 구성하고 설정합니다. 예를 들어, JobRepository, JobLauncher, StepBuilderFactory, JobBuilderFactory 등이 이 애너테이션을 통해 설정됩니다.
아마 위 애노테이션을 자동으로 포함한 autoConfiguration을 스프링부트가 하지 않을까 싶다.
중간 정리
- Intermediate Processor는 변환 과정에 사용될 프로세서로 사용자의 정의로 만들어 준다.
- configuration 클래스에서 reader, writer, processor를 bean으로 등록해 준다.
다음은 실제 작업 구성을 정의한다고 한다.
@Bean
public Job importUserJob(JobRepository jobRepository,
JobCompletionNotificationListener listener,
Step step1) {
return new JobBuilder("importUserJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
첫 번째 importUIserJob은 작업을 정의하고 step1은 단일 단계를 정의합니다.
작업은 단계로 구성되며 각 단계는 reader, processor, writer가 포함될 수 있다고 합니다.
여기서는 데이터베이스를 사용여 상태를 유지하기에 Incrementer가 필요합니다.
이후 각 단계를 나열한다고 합니다. 여기선 하나의 단계만 존재합니다.
단계에서는 한 번에 쓸 데이터의 양을 정의합니다. 이 경우 한 번에 10개의 레코드를 씁니다.
chunk는 제네릭 메서드이기에 접두사가 붙고, 이는 각 청크의 입력 및 출력 유형을 나타낸다고 합니다.
작업을 정의할 때 JobCompletionNotificationListener는 사용자 정의 클래스입니다.
이는 작업이 완료되었을 때 알림을 받는 리스너입니다.
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, row) -> new Person(
rs.getString(1),
rs.getString(2))
).forEach(person -> log.info("Found <" + person + "> in the database."));
}
}
}
전에 배치를 정의할 때 필요한 전처리기 후처리기가 있군요.
예제에서는 JdbcTemplate을 사용하지만 실제로는 다양한 방법으로 구현된 Repository를 받을 것 같습니다.
우선 여기서는 전처리 후처리를 할 수 있다는 것을 알 수 있네요
이를 웹 앱이나 WAR 파일에 포함할 수도 있지만 별도의 독립 실행형 애플리케이션을 만드는 것입니다.
스프링 부트 애플리케이션의 매력이죠
@SpringBootApplication
public class BatchProcessingApplication {
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
}
}
깔끔합니다.
@SpringBootApplication이 classpath를 확인하여 라이브러리를 확인하고 적합한 빈들을 자동으로 주입해 주고 전부 알아서 해줄 것입니다.
이제 우리는 jar파일로 빌드해야 합니다.
각자 메이븐이나 그래들로 빌드 후 실행하면 다음과 같이 로그가 출력되어야 합니다.
2023-05-14T22:06:25.138+09:00 INFO 17080 --- [ main] c.e.b.BatchProcessingApplication : Starting BatchProcessingApplication using Java 17.0.7 with PID 17080 (C:\Users\PC\batch-processing\target\classes started by PC in C:\Users\PC\batch-processing)
2023-05-14T22:06:25.140+09:00 INFO 17080 --- [ main] c.e.b.BatchProcessingApplication : No active profile set, falling back to 1 default profile: "default"
2023-05-14T22:06:25.498+09:00 INFO 17080 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2023-05-14T22:06:25.613+09:00 INFO 17080 --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Driver does not support get/set network timeout for connections. (feature not supported)
2023-05-14T22:06:25.613+09:00 INFO 17080 --- [ main] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Added connection org.hsqldb.jdbc.JDBCConnection@289778cd
2023-05-14T22:06:25.614+09:00 INFO 17080 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2023-05-14T22:06:25.756+09:00 INFO 17080 --- [ main] c.e.b.BatchProcessingApplication : Started BatchProcessingApplication in 0.821 seconds (process running for 1.092)
2023-05-14T22:06:25.757+09:00 INFO 17080 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2023-05-14T22:06:25.780+09:00 INFO 17080 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importUserJob]] launched with the following parameters: [{'run.id':'{value=1, type=class java.lang.Long, identifying=true}'}]
2023-05-14T22:06:25.789+09:00 INFO 17080 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2023-05-14T22:06:25.803+09:00 INFO 17080 --- [ main] c.e.batchprocessing.PersonItemProcessor : Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
2023-05-14T22:06:25.803+09:00 INFO 17080 --- [ main] c.e.batchprocessing.PersonItemProcessor : Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
2023-05-14T22:06:25.803+09:00 INFO 17080 --- [ main] c.e.batchprocessing.PersonItemProcessor : Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
2023-05-14T22:06:25.803+09:00 INFO 17080 --- [ main] c.e.batchprocessing.PersonItemProcessor : Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
2023-05-14T22:06:25.803+09:00 INFO 17080 --- [ main] c.e.batchprocessing.PersonItemProcessor : Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
2023-05-14T22:06:25.808+09:00 INFO 17080 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 19ms
2023-05-14T22:06:25.809+09:00 INFO 17080 --- [ main] c.e.b.JobCompletionNotificationListener : !!! JOB FINISHED! Time to verify the results
2023-05-14T22:06:25.810+09:00 INFO 17080 --- [ main] c.e.b.JobCompletionNotificationListener : Found <firstName: JILL, lastName: DOE> in the database.
2023-05-14T22:06:25.810+09:00 INFO 17080 --- [ main] c.e.b.JobCompletionNotificationListener : Found <firstName: JOE, lastName: DOE> in the database.
2023-05-14T22:06:25.810+09:00 INFO 17080 --- [ main] c.e.b.JobCompletionNotificationListener : Found <firstName: JUSTIN, lastName: DOE> in the database.
2023-05-14T22:06:25.810+09:00 INFO 17080 --- [ main] c.e.b.JobCompletionNotificationListener : Found <firstName: JANE, lastName: DOE> in the database.
2023-05-14T22:06:25.810+09:00 INFO 17080 --- [ main] c.e.b.JobCompletionNotificationListener : Found <firstName: JOHN, lastName: DOE> in the database.
2023-05-14T22:06:25.813+09:00 INFO 17080 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importUserJob]] completed with the following parameters: [{'run.id':'{value=1, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 24ms
2023-05-14T22:06:25.815+09:00 INFO 17080 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-05-14T22:06:25.821+09:00 INFO 17080 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
여기까지 파일을 읽고 이를 변환하여 실제 출력하는 것까지 진행해 보았다. 15분이면 된다는데, 중간에 @EnableBatchProcessing을 모르고 중복 정의 해버려서 배치가 안 돌아가는 오류를 해결하느라 10분 더 걸렸다.
이제 간단한 프로세스를 경험했으니 각 아키텍처에 대해 알아볼 차례이다.
ps- 4박 5일 괌
'Spring|Spring-boot > Spring Batch' 카테고리의 다른 글
Batch 예제 (0) | 2023.05.24 |
---|---|
Spring Batch 아키텍처 (0) | 2023.05.21 |
Batch란 (1) | 2023.05.14 |
Spring Batch JpaItemWriter (0) | 2021.03.06 |
Spring Batch JdbcBatchItemWriter (0) | 2021.03.06 |
댓글