infinity : 무한한 성장가능성
Spring Batch ItemReader/Item Writer/ Item Processor 이해 본문
이 글을 쓰게 된 이유
요즘 팀에서는 개발팀에서 공통적으로 사용하던 래거시 배치프로젝트(spring batch 가 아닌 자바로 구현된 배치)에서 팀 내 배치 프로젝트로 배치들을 이관하는 일을 주로 하고 있는데
이관하는 배치에 대해 간단하게 설명하자면, 특정 기간동안 사용하지 않은 정보들을 select 한 뒤 & delete 하는 간단한 배치이다.
pr을 올렸던 코드는 해당 배치를 chunk 기반으로 itemReader에서 select 쿼리를 실행해 제거할 대상을 조회하고 ItemWriter을 통해 delete 쿼리를 실행하는 것이었다.
팀장님이 pr 에 대한 피드백을 주셨는데, 주신 피드백으로는
- delete 쿼리에서 select 구문을 포함하는 방법
- 셀렉트 천건씩 & select 한 것을 모아 in 조건으로 delete
하면서 위와같은 말을 해주셨다 ㅎㅎ
처음에 writer 에서 한 번에 처리되는 거 아닌가? 했는데 나의 오해였다.
select에서 id 값을 가져온다고 한다면, delete에서는 제거 조건이 id 값이 동일한지 보고 제거하는 것이었기 때문에,
실질적으로 select 에서 천 개의 제거 대상이 있다면, writer에서 delete 쿼리도 천 번이 실행되는 것이었다.
그럼 확실히 in 절로 한번에 쿼리를 실행하는 것이 효율성 측면에서 좋은 것을 알 수 있다.
코드 개선을 하며 헷갈렸던 것 🤔
- ItemProcessor 은 ItemReader에서 item을 읽어올 때 건건이 ItemProcessor 이 실행되는 것 인지, ItemReader에서 item을 chunk size 만큼 읽어오고 ItemProcessor 이 실행되는 것 인지
- ItemReader 에서 읽어온 것을 ItemWriter로 넘길 때 어떻게 한 번에 넘겨야 될지 방법에 대한 고민
- ItemProcessor을 써서 Collection에 담아서 Writer에 넘겨야 하나?
- ItemWriter에서 처리할 수 있나?
ItemReader/Item Writer/ Item Processor의 동작 방식에 대해 간단하게 정리 및 기존 코드를 개선한 방법에 대해 다뤄보려고 합니다.
ItemReader/Item Writer/ Item Processor 의 동작 방식
- ChunkProvider: Chunk 단위 아이템 제공자, ItemReader를 사용해 소스로부터 아이템을 Chunk size 만큼 읽어 Chunk 단위로 만들어 제공하는 객체
- ChunkProcess: Chunk 단위 아이템 처리자
전반적으로 동작하는 프로세스가 잘 나와있는 그림입니다.
그림에 대해 간단하게 설명하자면, ChunkOrientedTasklet 의 execute() 함수를 실행하면서 스프링 배치가 시작됩니다.
ChunkOrientedTasklet 이 ChunkProvider 의 provide()을 호출하면, 최종으로 inputs에 읽어온 데이터가 담기고
chunkProcessor.process() 메서드를 호출해 그 내부에서 ItemProcessor의 작업과 ItemWriter의 작업이 이뤄지게 됩니다.
ChunkProvider의 구현체로는 SimpleChunkProvider, FaultTolerantChunkProvider 가 있고
SimpleChunkProvider을 기준으로 살펴보겠습니다.
👉 chunkProvider.provide()에서 실행되는 로직
1) ItemProcessor
try 문 안을 보면 read() 함수를 호출하는데, 해당 함수는 최종으로 SimpleChunkProvider의 doRead 함수를 호출하게 됩니다.
if(item == null)인 부분을 보면 더 이상 읽어올 item 이 없는 경우 RepeatStatus를 FINISHED로 리턴하는 것을 볼 수 있습니다.
즉, item을 읽어올 것이 계속 있는 경우에는 맨 마지막에 return RepeatStatus.CONTINUABLE 로 리턴되며 item 을 계속 읽어오게 됩니다.
itemReader로부터 item을 읽어오고, item 이 반환되는 것을 볼 수 있습니다,.
👉 chunkProcessor.process()에서 실행되는 로직
ChunkProcessor의 구현체로는 SimpleChunkProcessor, FaultTolerantChunkProcessor 가 있고 SimpleChunkProcessor 기준으로 살펴보도록 하겠습니다.
transform() 메서드를 먼저 살펴보면
ItemReader을 통해 읽어온 items를 inputs으로 받아 input 수만큼 doProcess 가 실행되는 것을 볼 수 있습니다,
itemProcessor 이 null 이 아니면 itemProcessor에서 process 메서드를 통해 item 객체의 변환? 작업이 이뤄지고
그 결과를 반환하게 됩니다.
2) ItemWriter
chunkProcessor.process()에서
doWriter 메서드에서 items를 인자로 넘겨 메서드를 실행하게 됩니다.
메서드를 타고 가면 doWrite 호출 -> writerItems 메서드를 호출하게 되고
itemWriter에서 write 메서드를 호출해 쿼리를 실행하게 됩니다.
간락하게 동작방법에 대해 학습했고, 그것을 바탕으로 위에서 헷갈렸던 것을 정리해 보겠습니다.
- ItemProcessor 은 ItemReader에서 item을 읽어올 때 건건이 ItemProcessor 이 실행되는 것 인지, ItemReader 에서 item 을 chunk size 만큼 읽어오고 ItemProcessor 이 실행되는 것 인지
👉 위에서 본 것처럼 ChunkProvide.provide() 메서드가 끝나고 ChunkProcess.process() 안에서 ItemProcessor 이 실행되는 것을 볼 수 있습니다. 즉 ItemReader에서 item 를 chunk 수만큼 읽어오고, ItemProcessor 로직이 실행되게 됩니다. - ItemReader 에서 읽어온 것을 ItemWriter로 넘길 때 어떻게 한 번에 넘겨야 될지 방법에 대한 고민
- ItemProcessor을 써서 Collection에 담아서 Writer에 넘겨야 하나?
- ItemWriter에서 처리할 수 있나?
👉 itemWriter에서 읽어온 items을 받아오고, process를 통한 객체의 변환과정이 필요 없기 때문에 process 가 아닌 writer에서 처리할 수 있습니다.
그럼 최종적으로 어떻게 처리했을까?
이 긴 과정이 민망할 정도로, 해결방법은 매우 간단했습니다.
ItemWriter을 익명 클래스로 만들어 writer을 오버라이딩 하여 구현했는데
생각해 보면, 정말 간단한 해결방법이 있었는데 각 동작 방법에 대해 확실한 개념이 잡히지 않아 어렵게 느껴졌던 것 같습니다.
덕분에 ChunkOrientedTasklet의 동작방식에 대해 명확히 공부하게 되어, 헷갈리지 않을 거 같습니다.
잘못된 부분은 댓글로 달아주시면 수정하도록 하겠습니다.
긴 글 읽어주셔서 감사합니다!