Spring Batch
스프링 배치란?
스프링 배치는 대량의 반복적인 데이터 처리를 위한 프레임워크로 안정적이고 구조화된 배치 처리 기능을 제공한다.
Quartz는 스케줄링, Spring Batch는 데이터 배치 처리를 담당한다. Quartz는 스프링 배치에 없는 다양한 스케줄링 기능을 지원하므로, Quartz + Batch 조합으로 사용한다.
스프링 배치 구조
Job > Step > Tasklet 또는 Chunk 기반 처리
- Job: 배치 작업의 최상위 단위
- Step: Job을 구성하는 하나의 실행 단위
next()로 Step을 순차적으로 연결 가능on(),to(),from(),end()를 활용해 조건 분기 가능(예: step A 실행 후 성공이면 step B 실패면 step C로 수행)
- Tasklet 기반 Step: 단일 작업을 수행
- Chunk 기반 Step:
ItemReader - ItemProcessor - ItemWriter흐름으로 대량 데이터를 처리
Chunk 기반 구성 요소
Chunk: 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다룬다.
- ItemReader: 데이터를 하나씩 읽는다.
- ItemProcessor(선택): Reader에서 읽은 데이터를 가공한다.
- ItemWriter: 데이터를 Chunk 단위로 일괄 저장한다.
Reader와 Processor에서는 1건씩 다루고, Writer에선 Chunk 단위로 처리한다.
메타 데이터 테이블 구조
스프링 배치는 총 6개의 메타데이터 테이블을 사용하며, 직접 생성해야 한다.
- BATCH_JOB_INSTANCE:
Job Parameter(외부에서 받을 수 있는 파라미터)에 따라 생성, 같은 Batch Job이라도Job Parameter가 다르면 생성 - BATCH_JOB_EXECUTION: BATCH_JOB_INSTANCE 테블과 부모-자식 관계이다. 부모 BATCH_JOB_INSTANCE가 성공/실패한 모든 내역 저장
- BATCH_JOB_EXECUTION_PARAMS: BATCH_JOB_EXECUTION 생성될 때, 입력받은
Job Parameter저장 - BATCH_STEP_EXECUTION
- BATCH_STEP_EXECUTION_CONTEXT
- BATCH_JOB_EXECUTION_CONTEXT
이 테이블들의 스키마는 spring-batch-core 라이브러리 내부에 포함되어 있으며, IDE에서 schema-로 검색하여 DBMS 별 스키마 파일을 복사해 사용한다.
예시 1: Querydsl 기반 Paging Reader를 사용하는 Job
아래 Job은 데이터를 페이징 방식(ItemReader)으로 읽어, 삭제하는 배치 Job이다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DeletePhotoViewHistoryJobConfig {
private final JobListener jobListener;
private final EntityManagerFactory entityManagerFactory;
private final JPAQueryFactory jpaQueryFactory;
@Value("${batch.chunk.size}")
private int chunkSize;
@Bean
public Job deletePhotoViewHistoryJob(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager) {
return new JobBuilder(JobNames.DELETE_PHOTO_VIEW_HISTORY, jobRepository).listener(jobListener)
.start(deletePhotoViewHistoryStep(jobRepository, platformTransactionManager)).build();
}
@Bean
@JobScope
public Step deletePhotoViewHistoryStep(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager) {
return new StepBuilder(StepNames.DELETE_PHOTO_VIEW_HISTORY, jobRepository)
// <T, T> 첫번째 T는 Reader 에 반환될 타입, 두번째 T는 Writer 에 파라미터로 넘어올 타입
.<PhotoViewHistory, PhotoViewHistory>chunk(chunkSize, platformTransactionManager)
.reader(deletePhotoViewHistoryReader(null))
.writer(deletePhotoViewHistoryWriter()).build();
}
@Bean
@StepScope
public QuerydslZeroPagingItemReader<PhotoViewHistory> deletePhotoViewHistoryReader(
@Value("#{jobParameters[current]}") LocalDateTime current) {
log.info("current :: {}", current);
return new QuerydslZeroPagingItemReader<>(entityManagerFactory, chunkSize,
jpaQueryFactory -> jpaQueryFactory.select(photoViewHistory).from(photoViewHistory)
.where(photoViewHistory.createdDatetime.before(current.minusDays(1)))
.orderBy(photoViewHistory.historyKey.asc()));
}
@Bean
@StepScope
public DeletePhotoViewHistoryJobWriter deletePhotoViewHistoryWriter() {
return new DeletePhotoViewHistoryJobWriter(jpaQueryFactory);
}
}
Cursor vs Paging Reader
ItemReader는 Cursor, Paging 방식이 있다.
- Cursor based Reader: 데이터베이스와 커넥션을 맺은 후, Cursor 를 한칸씩 옮기면서 지속적으로 데이터를 뽑아온다.
- 장점: 스트리밍 방식으로 메모리 사용 효율적
- 단점: 하나의 DB Connection을 장시간 사용하기 때문에 배치가 끝나기 전에 커넥션이 끊어질 수 있다. (장기 배치 작업에 취약)
- Paging based Reader
- 장점: 페이지마다 새로운 쿼리 실행 (커넥션 점유 문제 없음)
- 단점: 각 Page마다 조회 성능 고려 필요 (정렬 필수)
대부분의 실제 운영환경에서는 Paging 방식을 권장한다.
예시 2: 외부 서비스(AWS Cost Explorer) 데이터를 ListItemReader로 처리하는 Job
@Slf4j
@Configuration
@RequiredArgsConstructor
public class InsertCostMetricsByDailyJobConfig {
private final JobListener jobListener;
private final CostExplorerService costExplorerService;
private final CostMetricRepository costMetricRepository;
@Value("${batch.chunk.size}")
private int chunkSize;
@Bean
public Job insertCostMetricsByDailyJob(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager) {
return new JobBuilder(JobNames.INSERT_COST_METRICS_BY_DAILY, jobRepository).listener(
jobListener).start(insertCostMetricsByDailyStep(jobRepository, platformTransactionManager))
.build();
}
@Bean
@JobScope
public Step insertCostMetricsByDailyStep(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager) {
return new StepBuilder(StepNames.INSERT_COST_METRICS_BY_DAILY,jobRepository)
.<CostMetric, CostMetric>chunk(chunkSize, platformTransactionManager)
.reader(insertCostMetricsByDailyReader(null))
.writer(insertCostMetricsByDailyWriter())
.build();
}
@Bean
@StepScope
public ListItemReader<CostMetric> insertCostMetricsByDailyReader(
@Value("#{jobParameters[date]}") LocalDate date) { // date=20250101 형태로 외부에서 주입
List<CostMetric> metrics = costExplorerService.getCostByDaily(date); // AWS SDK를 통해 비용 데이터를 조회
return new ListItemReader<>(metrics);
}
@Bean
@StepScope
public InsertCostMetricsByDailyJobWriter insertCostMetricsByDailyWriter() {
return new InsertCostMetricsByDailyJobWriter(costMetricRepository);
}
}
ItemWriter 구현 예시
Reader/Processor에서 전달된 데이터를 Chunk 단위로 처리한다.
@Slf4j
@RequiredArgsConstructor
public class InsertCostMetricsByDailyJobWriter implements ItemWriter<CostMetric> {
private final CostMetricRepository costMetricRepository;
@Override
public void write(Chunk<? extends CostMetric> chunk) throws Exception {
costMetricRepository.saveAll(chunk);
}
}
배치 스케줄링 예시
@Slf4j
@Component
@EnableScheduling
@RequiredArgsConstructor
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final JobRegistry jobRegistry;
/**
* 매일 1시 실행
*/
@Scheduled(cron = "0 0 1 * * ?")
public void runInsertCostMetricsByDailyJob() {
try {
Job job = jobRegistry.getJob(JobNames.INSERT_COST_METRICS_BY_DAILY);
jobLauncher.run(job, getJobParameter());
} catch (NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error(e.toString());
}
}
private JobParameters getJobParameter() {
Map<String, JobParameter<?>> parameterMap = new HashMap<>();
parameterMap.put("current",
new JobParameter<LocalDateTime>(LocalDateTime.now(), LocalDateTime.class));
parameterMap.put("now", new JobParameter<LocalDate>(LocalDate.now(), LocalDate.class));
return new JobParameters(parameterMap);
}
}
외부에서 Spring Batch 실행하는 법
위의 예제에서는 @Scheduled를 이용해 애플리케이션 내부에서 배치 Job을 실행했다. 하지만 운영 환경에서는 다음과 같은 요구가 더 많다.
- 배치를 정해진 시간에만 실행하지 않고, 필요 시 수동 실행
- 배포와 분리된 실행 제어
- 실패 시 재실행/파라미터 변경 실행
- 운영자가 명시적으로 실행 버튼을 눌러 실행
Spring Boot + Spring Batch를 CLI 기반으로 실행하고, 이를 Jenkins, Crontab, Airflow 등 외부 스케줄러에서 호출하는 방식이 일반적이다.
실행할 Job 지정하기 (spring.batch.job.name)
여러 개의 Job이 등록된 경우, 특정 Job만 실행하도록 명시해야 한다. 지정하지 않으면 모든 Job 실행 시도를 하는데, 운영에서는 권장하지 않는다.
java -jar batch.jar --spring.batch.job.name=insertCostMetricsByDailyJob
Job Parameter를 외부에서 전달하기
Spring Batch의 Job은 Job Parameter가 같으면 재실행되지 않는다. 따라서 외부 실행 시 항상 파라미터를 명시적으로 전달하는 것이 중요하다.
java -jar batch.jar --spring.batch.job.name=insertCostMetricsByDailyJob date=2025-03-09
위 파라미터는 아래 코드로 주입된다.
@Value("#{jobParameters[date]}") LocalDate date
Jenkins에서 Spring Batch 실행하기
Jenkins Job 구성 예시
Build Step에서 Shell Script 실행
#!/bin/bash
java -jar /app/batch/batch.jar \
--spring.profiles.active=prod \
--spring.batch.job.names=insertCostMetricsByDailyJob \
date=$(date +%Y%m%d)
Jenkins Parameter와 연동
Jenkins의 Build Parameters를 활용하면 더 유연한 실행이 가능하다.
java -jar batch.jar \
--spring.batch.job.names=${JOB_NAME} \
date=${TARGET_DATE}
- 운영자가 UI에서 날자를 지정해 실행
- 장애 복구 시 특정 날짜만 재처리 가능
테스트 코드 예시
@SpringBootTest
@SpringBatchTest
@ActiveProfiles("test")
@Disabled
@EnableJpaAuditing
public abstract class BaseIntegrationTest {
@Autowired
// 테스트 코드에서 Job을 실행할 수 있도록 지원
protected JobLauncherTestUtils jobLauncherTestUtils;
}
public class DeletePhotoViewHistoryTest extends BaseIntegrationTest {
@Autowired
private PhotoViewHistoryRepository photoViewHistoryRepository;
@Autowired
private Job deletePhotoViewHistoryJob;
@BeforeEach
public void setUp() {
jobLauncherTestUtils.setJob(deletePhotoViewHistoryJob);
IntStream.range(0, 1001).forEach(
i -> photoViewHistoryRepository.save(
PhotoViewHistory.builder().photoBoardKey(i + 1).build()));
}
@AfterEach
public void tearDown() {
photoViewHistoryRepository.deleteAll();
}
@Test
public void deletePhotoViewHistoryJobTest() throws Exception {
LocalDateTime current = LocalDateTime.now();
JobParameters jobParameters = new JobParametersBuilder().addString("current",
String.valueOf(current.plusDays(1))).toJobParameters();
// JobParameters와 함께 Job 실행
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
List<PhotoViewHistory> result = photoViewHistoryRepository.findAll();
assertThat(result).hasSize(0);
}
}