[스프링 배치 2.0] 걸음마 떼기

1. 라이브러리 추가.

pom.xml에 스프링 번들 저장소에서 제공하는 스프링 배치 가장 최신 버전을 추가해줍니다

프로젝트가 세 개로 나눠져 있더군요. test 모듈은 테스트 용이니까 test scope으로 설정하는 것이 좋겠네요.

<dependency>

<groupId>org.springframework.batch</groupId>
<artifactId>org.springframework.batch.core</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<dependency>

<groupId>org.springframework.batch</groupId> <artifactId>org.springframework.batch.infrastructure</artifactId> <version>2.0.2.RELEASE</version>
</dependency>

<dependency>

<groupId>org.springframework.batch</groupId> <artifactId>org.springframework.batch.test</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

2. 배치를 Job와 Step으로 표현합니다.

하나의 Job이 하나의 배치 작업이라고 생각하면 될 듯 하고, 배치 작업의 세세한 단계들을 여러 개의 Step으로 표현할 수 있습니다. 하나의 Step은 ItemReader, ItemProcessor, ItemWriter를 각각 가질 수 있는데, 이 것들 다 필요 없이 Tasklet을 구현해서 통짜로 Step을 구현할 수도 있습니다. 그러나 저는 최대한 IR, IP, IW 구조를 이용해 보려고 한 번 써봤습니다.

할 일은 DB에서 특정 데이터들을 읽어와서 이메일로 뿌려주고, 배치가 끝나면 그 결과를 로깅할 것.

2-1. XML 스키마 설정.

batch 네임스페이스를 기본 네임스페이스로 설정한 XML 스키마를 사용합니다.

<?xml version=”1.0″ encoding=”UTF-8″?>
<beans:beans xmlns=”http://www.springframework.org/schema/batch”
    xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xmlns:aop=”http://www.springframework.org/schema/aop”
    xsi:schemaLocation=”http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.5.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd”
    xmlns:beans=”http://www.springframework.org/schema/beans”
    xmlns:context=”http://www.springframework.org/schema/context” xmlns:p=”http://www.springframework.org/schema/p”
    xmlns:tx=”http://www.springframework.org/schema/tx”>

// 여기에 job 설정.

</beans:beans>

job 하나 당 하나의 XML 설정 파일로 만드는걸 기본으로 하고, 공통으로 쓸 녀석들은 별도의 XML로 뺀 다음 각각의 job XML에서 import 해서 쓰는 것이 좋을 듯 합니다.

2-2. Job 설정.

본격적으로 job을 설정 파일로 만들어야 합니다. 배치 2.0에서는 개별 아이템들이 아니라 아이템 뭉탱이(chunk) 단위로 배치 작업을 설정할 수 있습니다.

    <job id=”itemCouningJob” job-repository=”jobRepository”>
        <step id=”itemCouningStep”>
            <tasklet>
                <chunk reader=”komaItemReader” writer=”emailWriter”
                    commit-interval=”100″ />
                <listeners>
                    <listener ref=”stepResultListener” />
                </listeners>
            </tasklet>
        </step>
        <listeners>
            <listener ref=”jobResultListener”/>
        </listeners>
    </job>

이게 끝입니다. 하나의 job 내부에. 하나의 step이 있고, 하나의 listerner(jobResultListener)를 등록해 뒀습니다. job 내부에 여러 개의 step을 설정할 수 있고, 여러 개의 리스너도 등록할 수 있습니다.

하나의 step 에는 리더, 프로세서, 라이터를 등록할 수 있는데, 제가 개념 잡기 힘들었던 부분은 하나의 item을 읽어와서 처리하고 바로 쓴다고??? 였습니다. 그러나 그게 아니더군요. 뭉탱이던 개별 Item이던 읽어온 다음 개별적으로 처리를 하는 것 까진 맞는데, 그것들을 하나 하나 라이터가 처리하는게 아니라, 모든 프로세싱이 끝난 아이템 전체 목록을 라이터가 받아서 작업 합니다.

따라서, 제가 할 일이었던 모든 아이템 목록을 읽어오는 것은 리더에게 맡기면 되고, 별도의 프로세싱 작업이 필요 없는 듯 하니 프로세서는 설정하지 않고, 모든 목록을 읽어온 뒤 이메일로 보내야 하니까 emailWriter라는 걸 하나 등록했습니다.

그리고 Step에도 리스너 하나를 등록해 두었습니다. 이 녀석은 Job에 등록한 것과는 달리 한 Step의 결과, 이름, 시작 시간, 끝난 시간, 수행 시간 등을 알 수 있는 여긴한 녀석입니다.; Job에 등록한 리스너는 모든 Step..즉 Job에 대한 내용을 알 수 있지요.

만약, 정의해야 하는 작업이 step의 리더, 프로세서, 라이터 개념과 잘 맞지 않다면..  Tasklet 자체를 구현한 다음 <tasklet ref=””/> 에 등록해주면 됩니다.

2-3. Job 에서 필요한 빈 정의 및 설정.

이제 위에서 설정한 job에 필요한 빈들을 등록합니다. 우선 리파지토리를 등록합니다.

    <job-repository id=”jobRepository” data-source=”dataSource”
        transaction-manager=”transactionManager” isolation-level-for-create=”SERIALIZABLE”
        table-prefix=”BATCH_” />

레퍼런스에 있는 것을 그대로 썼습니다. 실제 DB를 이용하도록 했습니다. 다음은 리더 입니다.

    <beans:bean id=”komaItemReader”
        class=”org.springframework.batch.item.database.HibernateCursorItemReader”
        p:queryString=”from Item” p:sessionFactory-ref=”sessionFactory” />

하이버네이트 커서 아이템 리더로.. 스프링 배치에서 기본으로 제공해주는 클래스입니다. HQL과 세션 팩토리만 설정하면 됩니다. 다음은 라이터 입니다. 이 녀석은 별도로 구현해 주었습니다.

    <beans:bean id=”emailWriter” class=”sandbox.batch.writer.EmailWriter”/>

public class EmailWriter implements ItemWriter {

    @Override
    public void write(List items) throws Exception {
        sendEmail(items);
    }

    private void sendEmail(List items) {
        System.out.println(“Mailing : ” + items);
    }

}

구현이라고 해봤자.. 그냥 저렇게 좀 허접하게 해뒀습니다. 주요 관심사는 아니기 때문에 저렇게 했습니다. 마지막으로 리스너 두개를 구현하는데 이 녀석들은 뭐 간단하니까 패스하죠.

3. 배치 실행하기.

배치 실행은 JobLauncher를 사용하는데 이 녀석은 jobRepository를 필요로 합니다. 따라서 이 녀석도 스프링 DI를 사용하도록 빈으로 등록해 줍니다.

    <beans:bean id=”jobLauncher”
        class=”org.springframework.batch.core.launch.support.SimpleJobLauncher” p:jobRepository-ref=”jobRepository” />

자 이제 드디어 자바 코드로 Job을 실행해 줍니다.

jobLauncher.run(job, new JobParameters());

jobLauncher 이 녀석을 applicationContext에서 DL을 하건, Autowiring으로 가져오건 하고, job도 2번 작업에서 등록한 녀석을 가져옵니다. 그리고 JobParameter 객체를 넘겨주면 실행이 안 될 겁니다.

테이블이 없다는 에러가 나오죠. 다음 스키마 파일 중에 사용 중인 DB에 맞는 녀석을 골라서 테이블을 만들어 준 다음에 실행해 봅니다. 그럼 돌아갈 겁니다.

4. 앞으로 할 일

JobParameter가 이전에 사용한 것과 같다면 배치는 다시 돌지 않습니다. JobParameter를 다른 정보를 가지도록 매번 새로 만들어 줘야 하는데, 그 부분을 좀 살펴봐야겠습니다. Incrementer라는 것과 관련이 있어 보이던데 매번 실행할 때마다 자동으로 겹치지 않는 JobParameter를 만들어 주는게 있을 법도 한데 말이죠.

배치 테스트에 대하 알아봐야겠습니다. 어떻게 테스트 할 수 있는지. 스프링 배치가 제공해주는 테스트 방법은 어떤 것인지.

하이버네이트와 스프링 배치 기본 테이블 스키마를 맵핑해줄 자바빈 객체들은 없는지 살펴봐야겠습니다. 이것도 어딘가 있을 법도 한데 말이죠. 테이블 보고서 직접 자바빈 만들어서 맵핑 해주기는 좀 귀찮자나요. 누군가 해놨거나.. 하이버네이트 툴로 스키마에서 자바빈을 만들고 맵핑도 해주는 걸 찾아보던가 해야겠습니다. 하이버 맵핑만 끝나면.. 뭐 배치 정보 가져와서 화면에 뿌려주기만 하면 되니깐.. 식을 죽 먹기겠네요.

음.. 그리고 간간히 레퍼런스 보면서 이론 공부도 좀 해야겠네요. 여러 용어들이랑 개념들 기타 세부 기능들에는 뭐가 있는지 등등등.

ItemReader – Spring Batch Chapter 3

DB에서는 단일 레코드, 애플리케이션에서는 단일 도메인 객체, 배치 작업에서는 Item. 이 녀석을 읽어들일 때 사용하는 것이 바로 ItemReader.

public interface ItemReader {

  Object read() throws Exception;

  void mark() throws MarkFailedException;

  void reset() throws ResetFailedException;
}

아주 깔끔한 인터페이스 read()는 읽고, mark()는 표시해두고, reset()은 최근에 mark() 된 곳으로 이동. 마치 책갈피가 있는 Iterator 느낌이 납니다.

그리고 이 인터페이스를 구현한 녀석이 DB나 파일등에 연결이 필요한 경우 반드시 같이 구현하는 인터페이스가 있는데 그건 바로 ItemStream.

public interface ItemStream {

  void open(ExecutionContext executionContext) throws StreamException;

  void update(ExecutionContext executionContext);
 
  void close(ExecutionContext executionContext) throws StreamException;
}

ExecutionContext를 사용해서 처음 위치부터 시작하지 않고 이전에 멈췄던 곳에서 다시 시작 할 수 있는 방법을 제공합니다. open() 해서 연결하고, update()는 ExecutionContext에서 현재 상태 읽어옵니다. 따라서 커밋하기 전에 update()를 호출해서 현재상태가 DB에 보존되고 있는지 확인할 때 사용하고, close()는 open으로 읽어온 자원들 반납합니다. 작업 마친 담에 꼭 호출해주는게 좋겠네요.

DB에 있는 Item을 읽어올 때, 스프링의 JdbcTemplate에서는 RowMapper를 사용해서 반환된 ResultSet에 있는 데이터들을 모두. 왕창. 한방에. 객체로 맵핑해서 반환해줬습니다. 이게 이슈가 되기도 합니다. 왜냐면, 데이터가 너무 많으면 메모리가 모자라서 죽게 될테니까요. 그런데 배치에서도 역시 RowMapper를 사용하는데, 그런 일이 벌어지지 않게 하기 위해서 두 가지 방법을 마련했습니다. 하나는 커서Cursor, 하나는 드라이빙쿼리DrivingQuery.

Cursor 스타일이란?
맵핑을 한 번에 전부 하는게 아니라, 한 번에 한 줄씩 합니다. “스트리밍” 방식이라고 할 수 있습니다. 위에서 살펴본 ItemReader의 next()를 호출 할 때마다 한 줄(레코드)씩 이동합니다.

  HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
  itemReader.setQueryString(“from CustomerCredit”);
  //For simplicity sake, assume sessionFactory already obtained.
  itemReader.setSessionFactory(sessionFactory);
  itemReader.setUseStatelessSession(true);
  int counter = 0;
  ExecutionContext executionContext = new ExecutionContext();
  itemReader.open(executionContext);
  Object customerCredit = new Object();
  while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
  }
  itemReader.close(executionContext);

itemReader.read()를 할 때마다 RowMapper를 적용해서 객체를 던져주고, 다음 레코드로 넘어갑니다.

Driving Query 스타일이란?
레코드 전체를 가져오는게 아니라, 주키만 가져온 다음에 필요시에 애플리케이션의 DAO를 사용해서 객체를 DB에서 로딩하는 방법. 이 방법을 쓰는 이유는 DB2 처럼 Pessimistic Locking(읽는 롹킹)하는 경우에 Cursor 방법이 비효율적일 수 있기 때문입니다.

이 녀석의 구현체인 DrivingQueryItemReader가 의존하고 있는 인터페이스 KeyCollector.

 public interface KeyCollector {

    List retrieveKeys(ExecutionContext executionContext);

    void updateContext(Object key, ExecutionContext executionContext);
  }

첫번째 메소드야 뻔하고, updateContext는 현재 어디까지 읽었나, 책갈피를 끼워두는 겁니다.

KeyCollector 구현체
– SingleColumnJdbcKeyCollector : 주키가 컬럼 하나 일때 사용.

  SingleColumnJdbcKeyCollector keyCollector = new SingleColumnJdbcKeyCollector(getJdbcTemplate(),
  “SELECT ID from T_FOOS order by ID”);

  keyCollector.setRestartSql(“SELECT ID from T_FOOS where ID > ? order by ID”);

  ExecutionContext executionContext = new ExecutionContext();

  List keys = keyStrategy.retrieveKeys(new ExecutionContext());

  for (int i = 0; i < keys.size(); i++) {
    System.out.println(keys.get(i));
  }

흠.. 이 예제 코드는 잘 이해가 안 가네요. ItemReader 인터페이스는 안 사용하나;;

복합키 일땐?? ExecutionContextRowMapper

The Domain Language of Batch – Spring Batch Chapter 2

배치 개념을 익히기 정말 좋은 챕터가 아닌가 생각됩니다. 스프링 배치의 도메인 언어인지, 일반적인 배치의 도메인 언어인지는 구분하기 힘들지만, 상당히 많이 정제되어 있다는 느낌을 받을 수 있었습니다. 분명, 수 많은 프로젝트의 배치 작업을 하면서 도출해낸 도메인 언어들이 아닐까 생각됩니다.

요즘은 귀찮아서 그림을 안 그렸었는데, 오랜만에 그려봐야겠습니다. 레퍼런스에 나와있는 그림에 표현하지 않은 도메인(JobParameters, ExecutionContext, Persistence 여부)도 있어서요.

사용자 삽입 이미지
후광이 있는 녀석들은 Persistent Domain입니다. 즉 (DB를 사용한다는 가정하에) Step이라는 테이블이 없다고 (즉, Step 정보를 유지하지 않는다고) 봐도 됩니다. 그래서 StepExecution 테이블에는 JobExecution의 주키를 참조하는 외례키 컬럼만 있고 Step_ID와 같은 컬럼은 없습니다.

연한색 박스는 인터페이스, 진한 색은 클래스입니다.

ExcutionContext는 StepExcuion 당 하나씩 생성됩니다.

재미있는 건 저 도메인들을 저장하는 책임을 지닌 JobRepository라는 인터페이스인데, 이 녀석의 구현이 어떻게 되어 있을까 궁금했는데, 구현체는 없었습니다. 어떻게 구현해야 할런지… JDBC 말고 애노테이션 기반 하이버네이트를 써서 구현하는 방법이 궁금해집니다. JobRepositoryHibernate 야.. 뭐 SessionFactory만 있으면 알아서 넣어줄테니 걱정되지 않는데, 저들 도메인의 맵핑 정보를 넘겨줘야 하는데 말이죠. 그걸 어떻게 애노테이션으로 할 방법은 없을까요. 흠… XML로만 해야 할까요. XML 로 해야 한다면, 맵핑 정보는 어떻게 만들면 될까요? 어느정도 고정적인 도메인이니까, 하나 만들어 두는게 좋을 것 같습니다.

흠.. Spring Batch + 하이버네이트를 기반으로 한 어떤 프레임워크가 되겠군요.

자세한 설명은 http://static.springframework.org/spring-batch/spring-batch-docs/reference/html/core.html 를 참조하세요.