지정한 job만 실행될 수 있도록 application.yml에는 아래 내용을 추가

spring:
  batch:
    job:
      names: ${job.name:NONE}

main 함수에는 아래와 같은 job이 실행된 다음 종료될 수 있는 코드를 추가하였다.

 ConfigurableApplicationContext applicationContext = SpringApplication.run(TestBatchApplication.class, args);
        System.exit(SpringApplication.exit(applicationContext));

 

Dashboard > Jenkins관리 > Configure System 에서 아래의 설정을 해준다. 

1. Global properties (jar위치, jvm 옵션 등등)

2. Build Timestamp

Timestamp 플러그인을 다운 받아 아래와 같이 설정 

Jenkins에서 Freestyle Project를 선택해서 shell script를 실행하게 하였고,

AWS EC2에 있는 jar 파일을 직접 실행하게끔 하였다. 

 

지정한 jar파일에 접근할 수 있도록 권한 주기

sudo chown -R ec2-user:ec2-user /var/lib/jenkins/workspace/

 

 

위에서 구성한 Jenkins Project들의 이름은 Pipeline 스크립트에서 사용된다.

 

파이프라인 설정

 

Pipeline 아이템 추가

 

파이프라인을 설정해주는데 나중에는 Trigger는 'Build periodically'를 선택해 일정 주기로 실행될 수 있게 해줄 것이다. 

파이프라인 스크립트는 아래와 같이 간단하게 작성해본다.

pipeline {
    agent none
    stages {
        stage('a') {
            steps {
                build 'ATargetJob'
            }
        }
          stage('b') {
            steps {
                build job: "BTargetJob", wait : true
            }
        }
        stage('c') {
            steps {
                build job: "CTargetJob", wait : true
            }
        }
        stage('d') {
            steps {
                build job: "DTargetJob", wait : true
            }
        }
    }
}

 

순차적으로 a->b->c->d가 실행되고, 중간에 실패되면 pipeline은 중지된다. 

AWS EC2 환경에 설치된 Jenkins로 실행하다 보니 로컬 환경에서와는 달리 실패되는 부분이 있었고, 결국 11번의 시도 끝에 pipeline이 정상적으로 실행되었다. 

 

 

 

이제 큰 틀은 짰으니 세세한 부분을 신경쓰러 떠나보겠다...

하루 그리고 반나절을 투자해 Spring Batch를 Spring Cloud Data Flow로 관리하려고 했다. 배치 잡 실행과 모니터링을 중앙에서 관리하고, UI도 심플하니 맘에 들어서였다. 그러나 결론은 실패..... 실패한 이유는 사실 프로젝트 일정에 맞추기엔 docker에 대한 사전 지식 부족으로 시간을 무한정 투자할 수가 없어서였다.

 

우선 SCDF설치를 하는 방법엔 여러가지가 있는데 (Docker Compose, Cloud Foundry, Kubernetes) 그 중에서 나는 docker를 이용해 로컬에 설치하는 방법을 택했다. 

https://dataflow.spring.io/docs/installation/local/docker/

 

Spring Cloud Data Flow(SCDF) 서버에 배치 잡 애플리케이션을 등록하려면 다음과 같은 단계를 따를 수 있다.

  1. 배치 잡 애플리케이션 빌드
    1. Spring Batch를 사용하여 배치 잡을 작성합니다.
    2. 배치 잡 애플리케이션을 빌드하여 실행 가능한 JAR 파일을 생성합니다.
  2. SCDF 서버 설치
    1. 도커로 설치
  3. 애플리케이션 등록
    1. SCDF 서버 대시보드에서 "Create App" 버튼을 클릭하여 애플리케이션을 등록합니다.
      "Type"을 "batch"로 선택하고, "Name"에 애플리케이션 이름을 입력하고, "URI"에 빌드한 JAR 파일의 경로를 입력합니다. (애플리케이션을 등록하면 SCDF 서버에 배치 잡 애플리케이션을 등록한 것)
  4. 배치 잡 실행
    1. SCDF 서버 대시보드에서 배치 잡 애플리케이션을 선택하고 실행을 시작합니다.
    2. 실행 구성 및 배치 잡 인수를 지정할 수 있습니다.

 

컴퓨터에 docker설치도 안되어 있던 터라 도커부터 설치를 해주고.. 

아래 docker-compose 파일을 실행해준다. 해당 파일에는 mysql, rabbitmq, dataflow-server, skipper-server, promateus, grafana 서비스를 등록한 내용이 담겨있다. spring cloud data flow를 설치하려면 db, messaging, dataflow-server,skipper-server가 필요하고 scheduling을 위해선 promateus, grafana 설정이 추가로 필요하다. 

 

version: '3'

services:
  mysql:
    image: mysql:5.7.25
    container_name: dataflow-mysql
    environment:
      MYSQL_DATABASE: dataflow
      MYSQL_USER: root
      MYSQL_ROOT_PASSWORD: rootpw
    networks:
      - dataflow-network
    expose:
      - '3306'
    ports:
      - '3306:3306'

  rabbitmq:
    image: rabbitmq:3.7.17-management-alpine
    container_name: dataflow-rabbitmq
    networks:
      - dataflow-network
    ports:
      - '5672:5672'
      - '15672:15672'

  dataflow-server:
    image: springcloud/spring-cloud-dataflow-server:2.2.1.RELEASE
    container_name: dataflow-server
    volumes:
      - "./tmp:/tmp"
      - "./workspace:/workspace"
    networks:
      - dataflow-network
    ports:
      - "9393:9393"
    environment:
      - spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.host=rabbitmq
      - spring.cloud.skipper.client.serverUri=http://skipper-server:7577/api
      - spring.cloud.dataflow.applicationProperties.stream.management.metrics.export.prometheus.enabled=true
      - spring.cloud.dataflow.applicationProperties.stream.spring.cloud.streamapp.security.enabled=false
      - spring.cloud.dataflow.applicationProperties.stream.management.endpoints.web.exposure.include=prometheus,info,health
      - spring.cloud.dataflow.grafana-info.url=http://localhost:3000
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow
      - SPRING_DATASOURCE_USERNAME=root
      - SPRING_DATASOURCE_PASSWORD=rootpw
      - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
    depends_on:
      - rabbitmq
    entrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-dataflow-server.jar"

  skipper-server:
    image: springcloud/spring-cloud-skipper-server:2.1.2.RELEASE
    container_name: skipper
    volumes:
      - "./tmp:/tmp"
      - "./workspace:/workspace"
    networks:
      - dataflow-network
    ports:
      - "7577:7577"
      - "9000-9010:9000-9010"
      - "20000-20105:20000-20105"
    environment:
      - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_LOW=20000
      - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_HIGH=20100
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow
      - SPRING_DATASOURCE_USERNAME=root
      - SPRING_DATASOURCE_PASSWORD=rootpw
      - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
    entrypoint: "./wait-for-it.sh mysql:3306 -- java -Djava.security.egd=file:/dev/./urandom -jar /spring-cloud-skipper-server.jar"

  # Grafana is configured with the Prometheus datasource.
  # Use `docker exec -it prometheus /bin/sh` to log into the container
  prometheus:
    image: springcloud/spring-cloud-dataflow-prometheus-local:2.2.1.RELEASE
    container_name: prometheus
    volumes:
      - 'scdf-targets:/etc/prometheus/'
    networks:
      - dataflow-network
    ports:
      - '9090:9090'
    depends_on:
      - service-discovery

  # The service-discovery container. Required for Prometheus setup only
  # Use `docker exec -it service-discovery /bin/sh` to log into the container
  service-discovery:
    image: springcloud/spring-cloud-dataflow-prometheus-service-discovery:0.0.4.RELEASE
    container_name: service-discovery
    volumes:
      - 'scdf-targets:/tmp/scdf-targets/'
    networks:
      - dataflow-network
    expose:
      - '8181'
    ports:
      - '8181:8181'
    environment:
      - metrics.prometheus.target.cron=0/20 * * * * *
      - metrics.prometheus.target.filePath=/tmp/scdf-targets/targets.json
      - metrics.prometheus.target.discoveryUrl=http://dataflow-server:9393/runtime/apps
      - metrics.prometheus.target.overrideIp=skipper-server
      - server.port=8181
    depends_on:
      - dataflow-server

  # Grafana SCDF Prometheus pre-built image:
  grafana:
    image: springcloud/spring-cloud-dataflow-grafana-prometheus:2.2.1.RELEASE a
    container_name: grafana
    networks:
      - dataflow-network
    ports:
      - '3000:3000'

networks:
  dataflow-network:

volumes:
  scdf-targets:

 

docker -f XXX.yml up 

명령어를 통해 위 파일을 실행해주면 아래와 같이 http://localhost:9393/dashboard/ 에서 Data Flow 화면을 확인할 수 있다. 

여기서 더 이상의 진행은 중단했다. APP을 등록하는 것에서 애를 먹기도 먹거니와 이걸 결국에는 운영 EC2 환경에서 다양한 변수들을 고려하여 세팅을 해줘야 하는데 허들이 꽤나 있을 것으로 예상되었다. 솔직히 이것만 붙잡고 하고 싶은 마음이 굴뚝 같았으나 (승부욕 발동) 새로운 기술을 써보겠다는 개인적인 사리사욕보다는 우선은 프로젝트 일정에 맞추는 것 더 중요하다 판단되어 일단 이 정도까지 알아본 것으로 마무리하고, 원래 계획대로 Jenkins에서 Spring Batch를 실행하기로.. (쥬륵)너무 아쉽고, 꼭 프로젝트 마치고 다시 도전해봐야 겠다. 

 

팀에서 처음으로 Spring Batch를 사용하다 보니 문제 상황이 생기면 물어볼수도 없어서 혼자 끙끙대는 중인데 가끔 스스로 에게 하는 질문이나 의문 자체가 잘못된 느낌도 들고, Spring Batch에 대해 다시 처음부터 공부하고 시작해야 하는게 아닌가 하는 불안감도 들고 개발 하는 내내 싱숭생숭하다; 개발하면서 또 이런 적은 처음이라 당황스럽다-_-; 일단 개발은 시작했는데 내일은 다시 좀 기본 컨셉, 개념들을 살펴보고 코드들을 전체적으로 훑어봐야겠다..

 

1) 복잡한 Query를 질의해야 한다.

 

단순 조인으로만 이루어진 쿼리가 아니라 union all이나 여러 개의 내부 select절이 포함된 복잡한 쿼리를 Reader에서 던져야 했다. (JPA를 사용하는 스프링 부트 프로젝트에선 QueryDSL로도 좀 만들기 까다로운 경우에는 Native Query를 사용해서 질의를 해왔다.) 후보에는 JpaPagingItemReader, JdbcPagingItemReader, JdbcCursorItemReader이 올랐고, Cursor의 경우는 Socket Time Out의 문제로 Connection 이 끊어지지 않게끔 설정을 해줘야 되는 번거로움이 존재했다. 다른 블로그에서도 Paging을 추천하기도 했고, Jpa가 익숙해서 JpaPagingItemReader를 사용하고 있었는데 JpaPagingItemReader로는 복잡한 쿼리를 질의하는 케이스를 아무리 찾아봐도 보이지가 않는다(그때는..) 여차 저차 비슷한 걸 찾아서 JpaNativeQueryProvider를 이용해보았는데 실패; (메인 쿼리 전에 Jpa 에서 사전? 쿼리를 날리는데 이게 문제가 된다) 그래서 JdbcPagingItemReader를 사용해보려고 했는데 setSelectClase()같이 쿼리를 구분해서 넣어주기엔 쿼리가 너무 복잡해서 어쩔 수 없이 JdbcCursorItemReader를 사용하게 됐다. 

 

    @Bean
    public JdbcCursorItemReader<TestEntity> jdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<TestEntity>()
                            .fetchSize(chunkSize)
                            .dataSource(dataSource)
                            .rowMapper(new TestEntityMapper()) // 원래는 이 부분에서 BeanPropertyRowMapper를 사용했었음
                            .sql(getQuery())
                            .name("jdbcCursorItemReader")
                            .build();
    }

간단하게 바꿔서 가져와 봤다. 여기서 또 다른 난관을 맞닥뜨리게 되는데; rowMapper 부분이다;

현재 DB는 PostgreSQL을 쓰고 있고, 컬럼 타입이 Array인 경우가 많다. 이런 경우에는 아래와 같이 타입을 선언해주고 CRUD를 처리했는데 JdbcCursorItemReader의 rowMapper로 전달한 BeanPropertyRowMapper가 해당 타입을 변환해주지 못하고 에러를 뱉는다. 

@Column(name = "emails", columnDefinition = "varchar[]")
@Type(type = "~~~.CustomStringArrayType")
private String[] emails;

그래서 RowMapper를 Implements한 Custom Mapper를 생성해주고 아래와 같이 Array.타입을 변환해 mapping해주게 하였다.

public class TestEntityMapper implements RowMapper<TestEntity> {
    @Override
    public RiExpirationTarget mapRow(ResultSet rs, int rowNum) throws SQLException {
        TestEntity testEntity = new TestEntity();
        testEntity.setEmails((String[]) rs.getArray("emails").getArray());
     	~~~~
        return testEntity;
    }
}

여기까지 왔는데 아래 포스팅을 발견...  갑자기 이런 의문이 든다;

아무리 복잡한 쿼리라고 해도 우아한 형제들에서 쓰는 쿼리보다 복잡하려나? 내가 혹시 이상한 길로 간게 아닐까 하는 의구심이 들며... 우선은 이번 프로젝트를 마치고 바로 해당 부분을 리팩토링 하는 것을 목표로 삼겠다.. 

https://techblog.woowahan.com/2662/

 

2) Processor에서 DB에 접근해 데이터를 가져오는 것이 찝찝한 느낌.. 그래도 될까? (Reader에서 처리되어야 할 것 같은 너낌 적인 너낌)

 

Spring Batch의 Processor 단계에서 DB에 접속해서 데이터를 가져오는 로직이 존재해도 된다고 한다.

Processor는 일반적으로 데이터의 변환 및 가공을 수행하는 단계로 데이터 소스에서 필요한 데이터를 가져와 가공하는 것이 일반적이라고 한다.

 

그러나 다음과 같은 사항에 유의해야 한다고 한다.

  1. 성능 문제: Processor가 데이터 소스에 매번 접속하고 데이터를 가져오는 것은 성능상 좋지 않을 수 있습니다. 따라서 대량의 데이터를 처리할 때는 Chunk 단위로 처리하거나, 데이터베이스 쿼리 최적화를 고려해야 합니다.
  2. 트랜잭션 처리: Processor에서 데이터 소스에 접속하고 데이터를 가져올 때, 트랜잭션 처리에 대한 고민이 필요합니다. 

따라서 Processor에서 DB에 접속해서 데이터를 가져오는 로직이 존재해도 되지만, 성능과 트랜잭션 처리에 대한 고민이 필요한데 Spring Batch에서는 일반적으로 Chunk 단위로 트랜잭션 처리를 하므로, Processor에서도 Chunk 단위로 트랜잭션 처리를 해야한다. 

Processor에서 사용하는 메소드에 @Transactional을 선언할 수 있지만, 이 경우 Chunk 단위로 트랜잭션이 처리되지 않을 수 있다. 따라서 @Transactional을 사용할 경우, Processor가 Chunk 단위로 실행되도록 설정해야 한다.

 

Chunk 단위로 트랜잭션 처리를 하기 위해서는 ItemReader에서 데이터를 읽어올 때, 트랜잭션을 시작하고, ItemWriter에서 데이터를 처리한 후에 트랜잭션을 커밋하면 된다. 이렇게 Chunk 단위로 트랜잭션 처리를 하면, Processor에서 DB에 접속해서 데이터를 가져오는 로직을 작성해도 안전하게 사용할 수 있게 된다.

1) 데이터 일부가 read/write이 안된다..

같은 Entity를 Reader로 읽고, Writer로 Update하려니까 전체가 100개라고 하면 약 50개 밖에 Update가 안되는 문제가 있었다. JpaPagingItermReader를 사용해서 그런 것이었고, 원인은 update를 chunk 단위로 함에 따라 limist과 offset를 이용해 select하는 결과값이 달라져서였다. 해결 방법은 Cursor를 사용하거나 PagingReader를 Overriding 하는 것이고, 자세한 내용은 아래에 글에 나와있다. (빨리 원인을 찾고 해결해서 다행..ㅠ)

 

https://jojoldu.tistory.com/337

 

 

Spring Batch Paging Reader 사용시 같은 조건의 데이터를 읽고 수정할때 문제

안녕하세요. 이번 시간에는 Spring Batch를 사용하시는 분들이 자주 묻는 질문 중 하나인 같은 조건의 데이터를 읽고 수정할때 어떻게 해야하는지 에 대해서 소개드리려고 합니다. 모든 코드는 Githu

jojoldu.tistory.com

 

2) 데이터를 가공해야만 한다..

DB에 저장된 원본 데이터 1row를 읽고 다른 테이블에 1row를 저장하는 설계를 바탕으로 개발을 했는데 N rows : 1 row의 데이터 연관관계가 있었다; 이에 따라 N rows를 1rows로 만드는 Step(2개)을 만들어 메인 Step전에 수행시켜 줬다. 

 

 

Main Step이 실행되기 전에 Sub1과 Sub2를 거쳐야 하며 Sub1이 실패가 나던, Sub2가 실패가 나던 무조건 StepMain은 실행해야 되는 논리구조다.

  • Sub1, Sub2 둘 다 실패가 나더라도 Step Main 은 실행
  • Sub1이 실패하면 Sub2->Main, Sub2가 실패하면 면 Sub1->Main

 

 

 

위와 같은 논리 구조를 아래와 같이 구현해봤는데, 살짝 의구심이 들었지만 테스트를 해보니 원하는 대로 동작한다. 

    @Bean
    public Job expirationNotificationJob() {
        return jobBuilderFactory.get("expirationNotificationJob")
                                .start(sub1ExpirationStep())
                                    .on("*")
                                    .to(sub2ExpirationStep())
                                    .on("*")
                                    .to(mainExpirationStep())
                                    .on("*")
                                    .end()
                                .end()
                                .build();
    }

Youtube 개발자 인큐티비의 SpringBatch 편을 참고하여 질문지를 리스트업 했습니다. 

 

  • 왜 스프링 배치를 사용하는가?
    • 대용량 데이터를 처리해야 함
    • 사용자 개입 없이 동작
    • 로깅, 통계처리, 트랜잭션 등의 비즈니스 로직 외에 배치 어플리케이션에 필요한 기능 사용 가능
    • 지정한 시간 내에 다른 어플리케이션을 방해하지 않고 수행
    • 충돌이나 중단 되었을 때 컨트롤이 가능함
  • 멱등성은 어떻게 유지하는가?
    • 멱등성 : 연산을 여러번 적용하더라도 결과가 달라지지 않는 성질
    • Spring Batch Job Parameter를 사용하여 외부에서 값을 주입받도록 하여 제어가 불가능한 코드를 제거한다.
      • ex) LocalDateTime.now()
    • 멱등성이라는 패러다임이 Spring Batch와는 적합하지 않다는 내용의 의견도 많다. (https://namocom.tistory.com/752)
      • Job Parameter를 이용해 외부에서 주입하는 것은 책임의 이동 정도로 이해하자는 의견도..
  • Spring Batch 메타 데이터 테이블은 어떤 것이 있는가?
    • (맨 하단 ERD 참고)
    • BATCH_JOB_INSTANCE (최상위)
      • Job Instance 객체의 정보를 담고 있음 
    • BATCH_JOB_EXECUTION_PARAMS
      • Job Parameter 정보를 담고 있음, Job 실행 시 사용된 파라미터 저장
      • 정규화되지 않은 형태의 테이블로 TYPE_CD 컬럼에서 저장되는 파라미터 타입을 가진다. 
    • BATCH_JOB_EXECUTION
      • Job Execution 객체의 정보를 가지고 있음, Job이 run 할 때마다 row가 추가됨
    • BATCH_STEP_EXECUTION
      • Step Execution 객체의 정보와 대응되는데 하나의 Job Execution에서 사용하는 Step 개수만큼 테이블의 row에 추가된다.
      • 배치가 돌아서 처리한 개수를 알고 싶을 때는 이 테이블을 찾아보면 된다. 
    • BATCH_JOB_EXECUTION_CONTEXT
      • 하나의 Job Execution 에 대해 하나의 Job Execution Context가 존재하며, Job레벨의 모든 데이터를 다 가지고 있다.
    • BATCH_STEP_EXECUTION_CONTEXT
      • 하나의 Step Execution 에 대해 하나의 Step Execution Context가 존재하며, 
      • JobInstance가 중단된 위치에서 다시 시작할 수 있도록 실패 후 검색되어야 하는 정보도 담고 있음
  • 배치 중간 실패하면 어떻게 처리하는지?
    • skip
      • 데이터를 처리하는 동안 설정된 Exception이 발생했을 경우, 해당 데이터 처리를 건너뛰는 기능
      • default 값은 0으로 사용을 원하는 경우 반드시 0 이상의 숫자를 입력해주고, 어떤 Exception을 skip 할 것인지 반드시 명시해주어야 한다. 
      • chunk 내부에서 이뤄지는데 Read/Process/Write 하는 과정에 설정해줄 수 있다. 
    • retry (아래 그림 참고)
      • 데이터를 Process/Write 하는 과정에서 설정된 Exception이 발생했을 경우, 지정한 정책에 따라 데이터 처리 재시도 하는 기능.
      • Read과정에서 주로 발생하는 FlatFileParseException 에 대한 문제는 대부분  Skip에서 처리가 된다.
      • 예를 들면 DeadlockLoserDataAccessException 발생 시 Retry가 일어나도록 설정할 수 있다. 다른 프로세스에서 처리중인 데이터에 새로운 프로세스가 접근하면 Locak이 걸려 있어 에러가 발생하는데 Retry를 하면 성공할 수 있을 것이다. 
  • 트랜잭션 관리 왜 청크단위로 하는가?
    • Chunk 
      • 각 커밋 사이에 처리되는 row 수
    • Chunk 지향 처리란?
      • Chunk 단위로 트랜잭션을 다루는 것 
      • Reader와 Processor에서는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리
        • Reader에서 데이터를 하나 읽어옵니다
        • 읽어온 데이터를 Processor에서 가공합니다
        • 가공된 데이터들을 별도의 공간에 모은 뒤, Chunk 단위만큼 쌓이게 되면 Writer에 전달하고 Writer는 일괄 저장합니다.
    • Why Chunk?
      • 커밋을 매번 하면 비용이 많이 듭니다. 데이터가 많은 경우라면 (Spring Batch를 사용한다면 당연하겠죠?) 매번 커밋을 하는 것은 이상적이지 않고, 각 트랜잭션에서 가능한 한 많은 항목을 처리하는 것이 바람직합니다. 이러한 이유로 한 커밋 내에서 처리하는 수를 chunk로 관리하게 된 것입니다. 
      • Chunk-oriented 프로세싱을 하게 되면 다양한 기능들을 사용할 수 있는 것이 장점인데 skip, retry, 특정 Exception에 대한 Rollback, 다양한 ItemReader 그리고Cursor, Paging 등이 대표적이다.
    • Page Size와 Chunk Size
      • PagingItemReader를 사용하면 보이는 Page Size는 한번에 조회할 Item의 양이고, Chunk Size는 한번에 처리될 트랜잭션 단위이다.
      • 보편적으로 두개의 사이즈 크기는 일치하는게 좋다. 
  • Cursor 기반 vs Paging 기반
    • Cursor는 한칸씩 커서를 옮기면서 데이터 1 Row씩을 가져온다.
      • 배치 처리가 완료될 때까지 데이터를 읽어오기 때문에 DB Connection Time이 Paging보다 길다. 
      • 모든 데이터를 메모리에 저장하기 때문에 메모리 사용량이 많다.
    • Paging은 설정한 PageSize 만큼 데이터를 가져오며 데이터 결과의 순서가 보장될 수 있도록 order by 사용 필요
      • PageSize만큼 DB Connection을 읽고 종료한다. 따라서 Cursor에 비해 상대적으로 DB Connection Time이 적다. 
      • 하지만 트랜잭션을 여러번 타야하는 단점이 있긴 하다. 
  • Multi Thread & Partitioning
    • 정해진 시간 안에 많은 데이터를 처리하기 위해 성능을 높이기 위해 사용하는 방법
    • 서비스에 적재된 데이터가 적을 경우에는 Spring Batch의 기본 기능들만 사용해도 큰 문제가 없으나, 데이터가 엄청나게 많이 쌓일시 배치 애플리케이션 역시 확장이 필요
    • Multi-threaded Step (아래 그림 참고)
      • 단일 Step을 수행할 때, 해당 Step 내의 각 Chunk를 별도의 여러 쓰레드에서 실행하는 방법
      • 정한 개수(throttleLimit)만큼의 스레드를 생성하여 수행하는데 ItemReader는 반드시 Thread-safe인지 확인해야 하며(데이터를 중복으로 읽어 오지 않게 하기 위해) 스프링 배치에서 제공하는 것중 JdbcPagingItemReader, JpaPagingItemReader가 Thread-safe하다.
    • Partitioning
      • Master가 데이터를 파티셔닝 한 다음 Slave가 개별 스레드를 통해 각 파티션을 처리하는 방식
      • 각 SlaveStep은 ItemReader / ItemProcessor / ItemWriter 등을 갖고 동작하며 작업을 독립적으로 병렬 처리합니다.
    • Multi-threaded 는 Thread-safe를 신경써야 하나 Partitioning은 Thread-safe하지 않아도 됩니다. 
  • tasklet model vs chunk (reader,processor,writer)
    • Step은 Tasklet 혹은 Chunk로 처리할 수 있다. 
    • When to use
      • Tasklet 
        • 실행할 작업이 간단하여 집계가 필요없고, 실행만 필요한 경우
      • Chunk
        • 실행할 작업이 복잡하고 청크 지향 처리를 사용하는 읽기, 처리 및 쓰기와 관련된 작업 실행이 포함된다고 가정합니다.
  • Batch 실행은 어떻게 하는지 
    • Jenkins로 
  • 모니터링은 어떻게? 
    • 예전에는 Spring Batch Admin이 있었는데 deprecated 되었고, 현재는 spring에서 Spring Cloud Data Flow 를 사용하라고 한다. 
    • 요건 직접 사용해보고 내용 추가해보겠습니다~
 
 

 

Spring Batch Meta Table


 

Spring Batch Retry


Multi-thread Step


 

AsyncItemProcessor / AsyncItemWriter


Cursor vs Paging

 


참고

https://jojoldu.tistory.com/489

https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#commitInterval

https://www.egovframe.go.kr/wiki/doku.php?id=egovframework:rte2:brte:batch_core:skip_repeat_retry 

https://velog.io/@backtony/Spring-Batch-%EB%A9%80%ED%8B%B0-%EC%8A%A4%EB%A0%88%EB%93%9C-%ED%94%84%EB%A1%9C%EC%84%B8%EC%8B%B1

아래 강의를 보고 정리한 내용입니다. 

[우아한테크세미나] 190926 우아한스프링배치 by 우아한형제들 이동욱님  https://youtu.be/_nkJkWVH-mo

 

  • JobParameter
    • Spring Batch는 외부에서 파라미터를 주입받아 Batch 컴포넌트에서 사용 할 수 있다. 
    • 사용법 : @Value("#{jobParameters[파라미터명]}") 타입 이름
  • @JobScope
    • Step에서 사용할 수 있고, Job이 실행되는 시점에 Bean이 생성된다.
    • 사용 예
      • @JobScope라고 선언해줘야 호출한 곳에서 넘겨진 파라미터를 받을 수 있다.
      • 컴파일 에러 방지를 위해 우선은 null값을 넘겨준다.
@Bean
public Job scopeJob() {
	return jobBuilderFactory.get("scopeJob")
    			.start(scopeStep1(null))
                            .next(scopeStep2())
                            .build();
}

@Bean
@JobScope
public Step scopeStep1(@Value("#{jobParameters[requestDate]}") String requestDate) {
	return stepBuilderFactory.get("scopeStep1")
    				 .tasklet((contribution, chunkContext) -> {
                             	log.info(">>> ");
                                return RepeatStatus.FINISHED;
                             })
                             .build();
}
  • @StepScope
    • Tasklet / Reader / Processor / Writer에서 사용할 수 있다.
@Bean
pulbic Step scopeStep2() {
	return stepBuilderFactory.get("scopeStep2")
    .tasklet(scopeStep2Tasklet(null))
    .build();
}

@Bean
@StepScope
public Tasklet scopeStep2Tasklet(@Value("#{jobParameters[requestDate]}") String requestDate) {
	return (contribution, chunkContext) -> {
    	log.info(">>>");
        return RepeatStatus.FINISHED;
    }
}

 

  • Type
    • Spring Batch의 JobParameter는 Long / String / Double / Date 타입들을 지원한다.
    • Enum / LocalDate / LocalDateTime은 지원하지 않는다.
      • 그렇다면?
        • @Value의 특성을 이용하면 String을 매번 LocalDateTime으로 변경하지 않아도 된다. (아래 코드 참고)
          • setter메소드에 @Value를 선언 문자열로 받은뒤 원하는 타입으로 세팅
@Slf4j
@Getter
@NoArgsConstructor
public class CreateDateJobParameter {
	private LocalDate localDate;
    
    @Value("#{jobParameters[createDate]}")
    public void setCreateDate(String createDate) {
    	DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        this.createDate = LocalDate.parse(createDate, formatter);
    }
}

 

@JobScope로 설정해놨기 때문에 Job이 실행될 때 Bean이 실행된다. 그러면 job실행되는 시점에 jobParameter Bean이 생성되서 값이 세팅된 다음에 CreateDateJobParameter에 Injection이 된다.

public class JObParameterBatchConfiguration {
	private final CreateDateJobParameter jobParameter;
    
    @Bean(BATCH_NAME + "jobParameter")
    @JobScope
    public CreateDateJobParameter jobParameter() {
    	return new CreateDateJobParameter():
    }
}

 

그 다음부터는 아래와 같이 사용하면 된다.jobParameter.getCreateDate()만 호출하면 된다. 

@Bean(name = BATCH_NAME + "_reader")
@StepScope
public JpaPagingItemReader<Product> reader() {
	Map<String, Object> params = new HashMap<>();
    params.put("createDate", jobParameter.getCreateDate()); // 어디에서나 LocalDate을 가져올 수 있음
    
    return new JpaPagingItemReaderBuilder<Product>()
    			.name(BATCH_NAME + "_reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Product p WHERE p.createDate = :createDate")
                .parameterValues(params)
                .build();
}

 

  • @JobScope, @StepScope의 특징(Late Binding)을 활용해보기
    •  @JobScope나 @StepScope가 선언된  Bean의 경우에는 Job이나 Step이 실행될 때 Bean이 생성되는 특징을 가진다. (일반 Spring처럼 애플리케이션이 로딩될 때 Bean이 생성되는 것이 아니라)
    • 그 뜻은 애플리케이션 실행 후에도 동적으로 reader / processor / writer bean 생성이 가능하다는 뜻
      • 예를 들아보자
        • 정산하는 시스템에서 ERP 연동이 2-30개가 되는데 하는 일이 비슷
        • 주문데이터 긁어와서 어디에 보내는 것~ 파라미터가 주문이냐 매출이냐 광고냐 그리고 읽어와야 될 테이블 이 값만 다르고 다른 건 전부 동일 
        • 그럴 때 마다 같은 클래스를 계속 생성할 수 없으니 LateBinding을 이용해서 파라미터로 주문으로 던지면 주문 테이블에서 읽어오는 리더로 바꿔서 배치를 돌리고, 광고면 광고 테이블에서 읽어오는 리더로 바꿔서 배치를 돌리는 것이다.
private StimpleStepBuilder<EaiEntity, EaiItem> readerAndProcessor(String dealCode, String txDateStr) {
	EaiReaderParameterDato parameterDto = createParameter(dealCode, txDateStr);
    
    EaiTaskletType readerProcessorType = EaiDatailType.findTaskletType(dealCode);
    EaiReaderFactory readerCreator = readerProcessorType.getReaderCreator();
    
    return stepBuilderFactory.get(stepName)
    		 	.<~>chunk(chunkSize)
                             .reader(readerCreator.create(chunkSize, emf, parameterDto)) 
                             .processor(eaiProcessor);
}
  • Batch 
    • 일괄처리
  • 사용 예
    • 엄청나게 큰 데이터를 가공해야 하는데 예를 들어 그 작업을 하루에 1번 정도 해줘야 한다. 
      • 엄청나게 큰 데이터를 가공해야 한다 ? 
        • CPU, I/O 등의 자원 처리에 서버 부하가 발생 
        • Request 처리를 올바르게 할 수 없다. 
        • 데이터 처리 중에 실패 발생하면, 다시 처음부터 하는게 아니라 끝난 지점부터하는게 좋음 (5만1번째)
      • 하루에 딱 1번 한다 ?
        • 하루에 한번만 돌게 하는 것을 구현하려고 API를 구성하는 것은 낭비일 수 있음 
        • 다른 사람이 한번 더 돌리는 실수를 막아야 함
  • SpringBatch는
    • '단발성으로 대용량의 데이터를 처리하는 어플리케이션' 이다.
      • Spring MVC를 사용함으로써 비즈니스 로직에 최대한 집중할 수 있었던 것 처럼 Spring Batch를 이용하면 배치 어플리케이션을 동작하는 데 필요한 로직에만 집중할 수 있다.
      • 예를 들어, 이미 실행된 경우 재실행을 불가능 하게 한다거나, 실패된 지점에서 부터 다시 동작시키게 할 수 있다거나... 이런 기능들을 SpringBatch가 쉽게 할 수 있도록 도와준다. 
      •  
  • Batch Application의 조건
    • 대용량 데이터 - 대용량의 데이터를 가져오거나 전달, 계산 등의 처리를 함 
    • 자동화 - 사용자 개입을 최소로 줄여 최대한 자동으로 동작될 수 있도록 한다.
    • 견고성 - 잘못된 데이터를 충돌/중단 없이 처리
    • 신뢰성 - 잘못된 부분을 추적할 수 있음 (로깅 / 알림)
    • 성능 - 지정된 시간 내에 처리, 다른 어플리케이션을 방해하지 않는다.

아래의 그림과는 표현한 방식이 살짝 다르다.
Job과 Step은 작업의 최소 단위,  Step을 통해 데이터 읽기/가공/결과기록 등의 로직을 묶어서 관리한다. Chunk는 한 번의 operation을 통해 다룰 데이터의 집합으로 각 Step은 설정에 정의된 Chunk 단위에 따라 데이터를 읽어 들이고 가공한 후 기록한다. Step의 설정에서 Chunk의 크기를 결정하는데 이는 데이터베이스의 한 row일 수도 있고, CSV 파일의 한 줄일 수도 있다. Chunk는 트랜잭션 관리 단위이기도 하여 마지막 처리 시점에 TX를 커밋하거나 롤백한다. ItemReader<Input> 은 작업 대상이 될 데이터를 읽어 들이는 컴포넌트이다. ItemProcessor<Input,Output>은 로직에 따라 가공하고, 변경된 데이터를 리턴한다. ItemWriter<Output>은 완성된 데이터를 정해진 곳에 저장/색인한다.

 

  • Batch Application 생성 - Simple ver.

Spring Initializer를 이용해 위의 세팅으로 프로젝트 생성

 

SpringBatch의 여러 기능을 쓰려면 필수로 추가해야하는 어노테이션
Job 생성과 Step 생성

run을 해보면..

 

log.info(">>>>> This is Step1") 가 잘 수행되어있음

 

 

 

 

 

  • Batch Application 생성 - hard ver.
    • PostgreSQL 환경에서 Spring Batch 실행해보기 
    • 먼저, Spring Batch를 작동하기 위해 필요한 메타 데이터 테이블을 세팅한다. 
      • Spring Batch 의 메타데이터는..
        • 이전에 실행한 Job에 대한 기록
        • 실패한 Batch Parameter 기록, 성공한 Job 기록
        • 재실행시 시작해야 될 포인트 기록
        • Job이 갖고 있는 Step에 대한 Histroy tracking (성공/실패 Step 기록)

소스 라이브러리에서 spring-batch-core 디렉토리 안에 있다.

 

더보기

-- Autogenerated: do not edit this file

CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL TIMESTAMP DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME TIMESTAMP NOT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

위의 쿼리문을 이용해 table을 생성해준다. 나는 AWS RDS를 이용해 PostgreSQL을 설치해 주었다. 

 

application.yml에 datasource 설정 완료

 

 

참조

medium.com/myrealtrip-product/spring-batch-%EC%B2%98%EC%9D%8C%EB%B6%80%ED%84%B0-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0-3c6a5db0646d

 

Spring Batch, 처음부터 시작하기

커머스 서비스 개발자를 위한 Spring Batch 입문

medium.com

jojoldu.tistory.com/325?category=902551

 

2. Spring Batch 가이드 - Batch Job 실행해보기

이번 시간에는 간단한 Spring Batch Job을 생성 & 실행하면서 전반적인 내용을 공부해보겠습니다. 작업한 모든 코드는 Github에 있으니 참고하시면 됩니다. 2-1. Spring Batch 프로젝트 생성하기 기본적인

jojoldu.tistory.com

 

+ Recent posts