지정한 job만 실행될 수 있도록 application.yml에는 아래 내용을 추가

spring:
  batch:
    job:
      names: ${job.name:NONE}

main 함수에는 아래와 같은 job이 실행된 다음 종료될 수 있는 코드를 추가하였다.

 ConfigurableApplicationContext applicationContext = SpringApplication.run(TestBatchApplication.class, args);
        System.exit(SpringApplication.exit(applicationContext));

 

Dashboard > Jenkins관리 > Configure System 에서 아래의 설정을 해준다. 

1. Global properties (jar위치, jvm 옵션 등등)

2. Build Timestamp

Timestamp 플러그인을 다운 받아 아래와 같이 설정 

Jenkins에서 Freestyle Project를 선택해서 shell script를 실행하게 하였고,

AWS EC2에 있는 jar 파일을 직접 실행하게끔 하였다. 

 

지정한 jar파일에 접근할 수 있도록 권한 주기

sudo chown -R ec2-user:ec2-user /var/lib/jenkins/workspace/

 

 

위에서 구성한 Jenkins Project들의 이름은 Pipeline 스크립트에서 사용된다.

 

파이프라인 설정

 

Pipeline 아이템 추가

 

파이프라인을 설정해주는데 나중에는 Trigger는 'Build periodically'를 선택해 일정 주기로 실행될 수 있게 해줄 것이다. 

파이프라인 스크립트는 아래와 같이 간단하게 작성해본다.

pipeline {
    agent none
    stages {
        stage('a') {
            steps {
                build 'ATargetJob'
            }
        }
          stage('b') {
            steps {
                build job: "BTargetJob", wait : true
            }
        }
        stage('c') {
            steps {
                build job: "CTargetJob", wait : true
            }
        }
        stage('d') {
            steps {
                build job: "DTargetJob", wait : true
            }
        }
    }
}

 

순차적으로 a->b->c->d가 실행되고, 중간에 실패되면 pipeline은 중지된다. 

AWS EC2 환경에 설치된 Jenkins로 실행하다 보니 로컬 환경에서와는 달리 실패되는 부분이 있었고, 결국 11번의 시도 끝에 pipeline이 정상적으로 실행되었다. 

 

 

 

이제 큰 틀은 짰으니 세세한 부분을 신경쓰러 떠나보겠다...

하루 그리고 반나절을 투자해 Spring Batch를 Spring Cloud Data Flow로 관리하려고 했다. 배치 잡 실행과 모니터링을 중앙에서 관리하고, UI도 심플하니 맘에 들어서였다. 그러나 결론은 실패..... 실패한 이유는 사실 프로젝트 일정에 맞추기엔 docker에 대한 사전 지식 부족으로 시간을 무한정 투자할 수가 없어서였다.

 

우선 SCDF설치를 하는 방법엔 여러가지가 있는데 (Docker Compose, Cloud Foundry, Kubernetes) 그 중에서 나는 docker를 이용해 로컬에 설치하는 방법을 택했다. 

https://dataflow.spring.io/docs/installation/local/docker/

 

Spring Cloud Data Flow(SCDF) 서버에 배치 잡 애플리케이션을 등록하려면 다음과 같은 단계를 따를 수 있다.

  1. 배치 잡 애플리케이션 빌드
    1. Spring Batch를 사용하여 배치 잡을 작성합니다.
    2. 배치 잡 애플리케이션을 빌드하여 실행 가능한 JAR 파일을 생성합니다.
  2. SCDF 서버 설치
    1. 도커로 설치
  3. 애플리케이션 등록
    1. SCDF 서버 대시보드에서 "Create App" 버튼을 클릭하여 애플리케이션을 등록합니다.
      "Type"을 "batch"로 선택하고, "Name"에 애플리케이션 이름을 입력하고, "URI"에 빌드한 JAR 파일의 경로를 입력합니다. (애플리케이션을 등록하면 SCDF 서버에 배치 잡 애플리케이션을 등록한 것)
  4. 배치 잡 실행
    1. SCDF 서버 대시보드에서 배치 잡 애플리케이션을 선택하고 실행을 시작합니다.
    2. 실행 구성 및 배치 잡 인수를 지정할 수 있습니다.

 

컴퓨터에 docker설치도 안되어 있던 터라 도커부터 설치를 해주고.. 

아래 docker-compose 파일을 실행해준다. 해당 파일에는 mysql, rabbitmq, dataflow-server, skipper-server, promateus, grafana 서비스를 등록한 내용이 담겨있다. spring cloud data flow를 설치하려면 db, messaging, dataflow-server,skipper-server가 필요하고 scheduling을 위해선 promateus, grafana 설정이 추가로 필요하다. 

 

version: '3'

services:
  mysql:
    image: mysql:5.7.25
    container_name: dataflow-mysql
    environment:
      MYSQL_DATABASE: dataflow
      MYSQL_USER: root
      MYSQL_ROOT_PASSWORD: rootpw
    networks:
      - dataflow-network
    expose:
      - '3306'
    ports:
      - '3306:3306'

  rabbitmq:
    image: rabbitmq:3.7.17-management-alpine
    container_name: dataflow-rabbitmq
    networks:
      - dataflow-network
    ports:
      - '5672:5672'
      - '15672:15672'

  dataflow-server:
    image: springcloud/spring-cloud-dataflow-server:2.2.1.RELEASE
    container_name: dataflow-server
    volumes:
      - "./tmp:/tmp"
      - "./workspace:/workspace"
    networks:
      - dataflow-network
    ports:
      - "9393:9393"
    environment:
      - spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.host=rabbitmq
      - spring.cloud.skipper.client.serverUri=http://skipper-server:7577/api
      - spring.cloud.dataflow.applicationProperties.stream.management.metrics.export.prometheus.enabled=true
      - spring.cloud.dataflow.applicationProperties.stream.spring.cloud.streamapp.security.enabled=false
      - spring.cloud.dataflow.applicationProperties.stream.management.endpoints.web.exposure.include=prometheus,info,health
      - spring.cloud.dataflow.grafana-info.url=http://localhost:3000
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow
      - SPRING_DATASOURCE_USERNAME=root
      - SPRING_DATASOURCE_PASSWORD=rootpw
      - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
    depends_on:
      - rabbitmq
    entrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-dataflow-server.jar"

  skipper-server:
    image: springcloud/spring-cloud-skipper-server:2.1.2.RELEASE
    container_name: skipper
    volumes:
      - "./tmp:/tmp"
      - "./workspace:/workspace"
    networks:
      - dataflow-network
    ports:
      - "7577:7577"
      - "9000-9010:9000-9010"
      - "20000-20105:20000-20105"
    environment:
      - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_LOW=20000
      - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_HIGH=20100
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow
      - SPRING_DATASOURCE_USERNAME=root
      - SPRING_DATASOURCE_PASSWORD=rootpw
      - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver
    entrypoint: "./wait-for-it.sh mysql:3306 -- java -Djava.security.egd=file:/dev/./urandom -jar /spring-cloud-skipper-server.jar"

  # Grafana is configured with the Prometheus datasource.
  # Use `docker exec -it prometheus /bin/sh` to log into the container
  prometheus:
    image: springcloud/spring-cloud-dataflow-prometheus-local:2.2.1.RELEASE
    container_name: prometheus
    volumes:
      - 'scdf-targets:/etc/prometheus/'
    networks:
      - dataflow-network
    ports:
      - '9090:9090'
    depends_on:
      - service-discovery

  # The service-discovery container. Required for Prometheus setup only
  # Use `docker exec -it service-discovery /bin/sh` to log into the container
  service-discovery:
    image: springcloud/spring-cloud-dataflow-prometheus-service-discovery:0.0.4.RELEASE
    container_name: service-discovery
    volumes:
      - 'scdf-targets:/tmp/scdf-targets/'
    networks:
      - dataflow-network
    expose:
      - '8181'
    ports:
      - '8181:8181'
    environment:
      - metrics.prometheus.target.cron=0/20 * * * * *
      - metrics.prometheus.target.filePath=/tmp/scdf-targets/targets.json
      - metrics.prometheus.target.discoveryUrl=http://dataflow-server:9393/runtime/apps
      - metrics.prometheus.target.overrideIp=skipper-server
      - server.port=8181
    depends_on:
      - dataflow-server

  # Grafana SCDF Prometheus pre-built image:
  grafana:
    image: springcloud/spring-cloud-dataflow-grafana-prometheus:2.2.1.RELEASE a
    container_name: grafana
    networks:
      - dataflow-network
    ports:
      - '3000:3000'

networks:
  dataflow-network:

volumes:
  scdf-targets:

 

docker -f XXX.yml up 

명령어를 통해 위 파일을 실행해주면 아래와 같이 http://localhost:9393/dashboard/ 에서 Data Flow 화면을 확인할 수 있다. 

여기서 더 이상의 진행은 중단했다. APP을 등록하는 것에서 애를 먹기도 먹거니와 이걸 결국에는 운영 EC2 환경에서 다양한 변수들을 고려하여 세팅을 해줘야 하는데 허들이 꽤나 있을 것으로 예상되었다. 솔직히 이것만 붙잡고 하고 싶은 마음이 굴뚝 같았으나 (승부욕 발동) 새로운 기술을 써보겠다는 개인적인 사리사욕보다는 우선은 프로젝트 일정에 맞추는 것 더 중요하다 판단되어 일단 이 정도까지 알아본 것으로 마무리하고, 원래 계획대로 Jenkins에서 Spring Batch를 실행하기로.. (쥬륵)너무 아쉽고, 꼭 프로젝트 마치고 다시 도전해봐야 겠다. 

 

팀에서 처음으로 Spring Batch를 사용하다 보니 문제 상황이 생기면 물어볼수도 없어서 혼자 끙끙대는 중인데 가끔 스스로 에게 하는 질문이나 의문 자체가 잘못된 느낌도 들고, Spring Batch에 대해 다시 처음부터 공부하고 시작해야 하는게 아닌가 하는 불안감도 들고 개발 하는 내내 싱숭생숭하다; 개발하면서 또 이런 적은 처음이라 당황스럽다-_-; 일단 개발은 시작했는데 내일은 다시 좀 기본 컨셉, 개념들을 살펴보고 코드들을 전체적으로 훑어봐야겠다..

 

1) 복잡한 Query를 질의해야 한다.

 

단순 조인으로만 이루어진 쿼리가 아니라 union all이나 여러 개의 내부 select절이 포함된 복잡한 쿼리를 Reader에서 던져야 했다. (JPA를 사용하는 스프링 부트 프로젝트에선 QueryDSL로도 좀 만들기 까다로운 경우에는 Native Query를 사용해서 질의를 해왔다.) 후보에는 JpaPagingItemReader, JdbcPagingItemReader, JdbcCursorItemReader이 올랐고, Cursor의 경우는 Socket Time Out의 문제로 Connection 이 끊어지지 않게끔 설정을 해줘야 되는 번거로움이 존재했다. 다른 블로그에서도 Paging을 추천하기도 했고, Jpa가 익숙해서 JpaPagingItemReader를 사용하고 있었는데 JpaPagingItemReader로는 복잡한 쿼리를 질의하는 케이스를 아무리 찾아봐도 보이지가 않는다(그때는..) 여차 저차 비슷한 걸 찾아서 JpaNativeQueryProvider를 이용해보았는데 실패; (메인 쿼리 전에 Jpa 에서 사전? 쿼리를 날리는데 이게 문제가 된다) 그래서 JdbcPagingItemReader를 사용해보려고 했는데 setSelectClase()같이 쿼리를 구분해서 넣어주기엔 쿼리가 너무 복잡해서 어쩔 수 없이 JdbcCursorItemReader를 사용하게 됐다. 

 

    @Bean
    public JdbcCursorItemReader<TestEntity> jdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<TestEntity>()
                            .fetchSize(chunkSize)
                            .dataSource(dataSource)
                            .rowMapper(new TestEntityMapper()) // 원래는 이 부분에서 BeanPropertyRowMapper를 사용했었음
                            .sql(getQuery())
                            .name("jdbcCursorItemReader")
                            .build();
    }

간단하게 바꿔서 가져와 봤다. 여기서 또 다른 난관을 맞닥뜨리게 되는데; rowMapper 부분이다;

현재 DB는 PostgreSQL을 쓰고 있고, 컬럼 타입이 Array인 경우가 많다. 이런 경우에는 아래와 같이 타입을 선언해주고 CRUD를 처리했는데 JdbcCursorItemReader의 rowMapper로 전달한 BeanPropertyRowMapper가 해당 타입을 변환해주지 못하고 에러를 뱉는다. 

@Column(name = "emails", columnDefinition = "varchar[]")
@Type(type = "~~~.CustomStringArrayType")
private String[] emails;

그래서 RowMapper를 Implements한 Custom Mapper를 생성해주고 아래와 같이 Array.타입을 변환해 mapping해주게 하였다.

public class TestEntityMapper implements RowMapper<TestEntity> {
    @Override
    public RiExpirationTarget mapRow(ResultSet rs, int rowNum) throws SQLException {
        TestEntity testEntity = new TestEntity();
        testEntity.setEmails((String[]) rs.getArray("emails").getArray());
     	~~~~
        return testEntity;
    }
}

여기까지 왔는데 아래 포스팅을 발견...  갑자기 이런 의문이 든다;

아무리 복잡한 쿼리라고 해도 우아한 형제들에서 쓰는 쿼리보다 복잡하려나? 내가 혹시 이상한 길로 간게 아닐까 하는 의구심이 들며... 우선은 이번 프로젝트를 마치고 바로 해당 부분을 리팩토링 하는 것을 목표로 삼겠다.. 

https://techblog.woowahan.com/2662/

 

2) Processor에서 DB에 접근해 데이터를 가져오는 것이 찝찝한 느낌.. 그래도 될까? (Reader에서 처리되어야 할 것 같은 너낌 적인 너낌)

 

Spring Batch의 Processor 단계에서 DB에 접속해서 데이터를 가져오는 로직이 존재해도 된다고 한다.

Processor는 일반적으로 데이터의 변환 및 가공을 수행하는 단계로 데이터 소스에서 필요한 데이터를 가져와 가공하는 것이 일반적이라고 한다.

 

그러나 다음과 같은 사항에 유의해야 한다고 한다.

  1. 성능 문제: Processor가 데이터 소스에 매번 접속하고 데이터를 가져오는 것은 성능상 좋지 않을 수 있습니다. 따라서 대량의 데이터를 처리할 때는 Chunk 단위로 처리하거나, 데이터베이스 쿼리 최적화를 고려해야 합니다.
  2. 트랜잭션 처리: Processor에서 데이터 소스에 접속하고 데이터를 가져올 때, 트랜잭션 처리에 대한 고민이 필요합니다. 

따라서 Processor에서 DB에 접속해서 데이터를 가져오는 로직이 존재해도 되지만, 성능과 트랜잭션 처리에 대한 고민이 필요한데 Spring Batch에서는 일반적으로 Chunk 단위로 트랜잭션 처리를 하므로, Processor에서도 Chunk 단위로 트랜잭션 처리를 해야한다. 

Processor에서 사용하는 메소드에 @Transactional을 선언할 수 있지만, 이 경우 Chunk 단위로 트랜잭션이 처리되지 않을 수 있다. 따라서 @Transactional을 사용할 경우, Processor가 Chunk 단위로 실행되도록 설정해야 한다.

 

Chunk 단위로 트랜잭션 처리를 하기 위해서는 ItemReader에서 데이터를 읽어올 때, 트랜잭션을 시작하고, ItemWriter에서 데이터를 처리한 후에 트랜잭션을 커밋하면 된다. 이렇게 Chunk 단위로 트랜잭션 처리를 하면, Processor에서 DB에 접속해서 데이터를 가져오는 로직을 작성해도 안전하게 사용할 수 있게 된다.

1) 데이터 일부가 read/write이 안된다..

같은 Entity를 Reader로 읽고, Writer로 Update하려니까 전체가 100개라고 하면 약 50개 밖에 Update가 안되는 문제가 있었다. JpaPagingItermReader를 사용해서 그런 것이었고, 원인은 update를 chunk 단위로 함에 따라 limist과 offset를 이용해 select하는 결과값이 달라져서였다. 해결 방법은 Cursor를 사용하거나 PagingReader를 Overriding 하는 것이고, 자세한 내용은 아래에 글에 나와있다. (빨리 원인을 찾고 해결해서 다행..ㅠ)

 

https://jojoldu.tistory.com/337

 

 

Spring Batch Paging Reader 사용시 같은 조건의 데이터를 읽고 수정할때 문제

안녕하세요. 이번 시간에는 Spring Batch를 사용하시는 분들이 자주 묻는 질문 중 하나인 같은 조건의 데이터를 읽고 수정할때 어떻게 해야하는지 에 대해서 소개드리려고 합니다. 모든 코드는 Githu

jojoldu.tistory.com

 

2) 데이터를 가공해야만 한다..

DB에 저장된 원본 데이터 1row를 읽고 다른 테이블에 1row를 저장하는 설계를 바탕으로 개발을 했는데 N rows : 1 row의 데이터 연관관계가 있었다; 이에 따라 N rows를 1rows로 만드는 Step(2개)을 만들어 메인 Step전에 수행시켜 줬다. 

 

 

Main Step이 실행되기 전에 Sub1과 Sub2를 거쳐야 하며 Sub1이 실패가 나던, Sub2가 실패가 나던 무조건 StepMain은 실행해야 되는 논리구조다.

  • Sub1, Sub2 둘 다 실패가 나더라도 Step Main 은 실행
  • Sub1이 실패하면 Sub2->Main, Sub2가 실패하면 면 Sub1->Main

 

 

 

위와 같은 논리 구조를 아래와 같이 구현해봤는데, 살짝 의구심이 들었지만 테스트를 해보니 원하는 대로 동작한다. 

    @Bean
    public Job expirationNotificationJob() {
        return jobBuilderFactory.get("expirationNotificationJob")
                                .start(sub1ExpirationStep())
                                    .on("*")
                                    .to(sub2ExpirationStep())
                                    .on("*")
                                    .to(mainExpirationStep())
                                    .on("*")
                                    .end()
                                .end()
                                .build();
    }

Youtube 개발자 인큐티비의 SpringBatch 편을 참고하여 질문지를 리스트업 했습니다. 

 

  • 왜 스프링 배치를 사용하는가?
    • 대용량 데이터를 처리해야 함
    • 사용자 개입 없이 동작
    • 로깅, 통계처리, 트랜잭션 등의 비즈니스 로직 외에 배치 어플리케이션에 필요한 기능 사용 가능
    • 지정한 시간 내에 다른 어플리케이션을 방해하지 않고 수행
    • 충돌이나 중단 되었을 때 컨트롤이 가능함
  • 멱등성은 어떻게 유지하는가?
    • 멱등성 : 연산을 여러번 적용하더라도 결과가 달라지지 않는 성질
    • Spring Batch Job Parameter를 사용하여 외부에서 값을 주입받도록 하여 제어가 불가능한 코드를 제거한다.
      • ex) LocalDateTime.now()
    • 멱등성이라는 패러다임이 Spring Batch와는 적합하지 않다는 내용의 의견도 많다. (https://namocom.tistory.com/752)
      • Job Parameter를 이용해 외부에서 주입하는 것은 책임의 이동 정도로 이해하자는 의견도..
  • Spring Batch 메타 데이터 테이블은 어떤 것이 있는가?
    • (맨 하단 ERD 참고)
    • BATCH_JOB_INSTANCE (최상위)
      • Job Instance 객체의 정보를 담고 있음 
    • BATCH_JOB_EXECUTION_PARAMS
      • Job Parameter 정보를 담고 있음, Job 실행 시 사용된 파라미터 저장
      • 정규화되지 않은 형태의 테이블로 TYPE_CD 컬럼에서 저장되는 파라미터 타입을 가진다. 
    • BATCH_JOB_EXECUTION
      • Job Execution 객체의 정보를 가지고 있음, Job이 run 할 때마다 row가 추가됨
    • BATCH_STEP_EXECUTION
      • Step Execution 객체의 정보와 대응되는데 하나의 Job Execution에서 사용하는 Step 개수만큼 테이블의 row에 추가된다.
      • 배치가 돌아서 처리한 개수를 알고 싶을 때는 이 테이블을 찾아보면 된다. 
    • BATCH_JOB_EXECUTION_CONTEXT
      • 하나의 Job Execution 에 대해 하나의 Job Execution Context가 존재하며, Job레벨의 모든 데이터를 다 가지고 있다.
    • BATCH_STEP_EXECUTION_CONTEXT
      • 하나의 Step Execution 에 대해 하나의 Step Execution Context가 존재하며, 
      • JobInstance가 중단된 위치에서 다시 시작할 수 있도록 실패 후 검색되어야 하는 정보도 담고 있음
  • 배치 중간 실패하면 어떻게 처리하는지?
    • skip
      • 데이터를 처리하는 동안 설정된 Exception이 발생했을 경우, 해당 데이터 처리를 건너뛰는 기능
      • default 값은 0으로 사용을 원하는 경우 반드시 0 이상의 숫자를 입력해주고, 어떤 Exception을 skip 할 것인지 반드시 명시해주어야 한다. 
      • chunk 내부에서 이뤄지는데 Read/Process/Write 하는 과정에 설정해줄 수 있다. 
    • retry (아래 그림 참고)
      • 데이터를 Process/Write 하는 과정에서 설정된 Exception이 발생했을 경우, 지정한 정책에 따라 데이터 처리 재시도 하는 기능.
      • Read과정에서 주로 발생하는 FlatFileParseException 에 대한 문제는 대부분  Skip에서 처리가 된다.
      • 예를 들면 DeadlockLoserDataAccessException 발생 시 Retry가 일어나도록 설정할 수 있다. 다른 프로세스에서 처리중인 데이터에 새로운 프로세스가 접근하면 Locak이 걸려 있어 에러가 발생하는데 Retry를 하면 성공할 수 있을 것이다. 
  • 트랜잭션 관리 왜 청크단위로 하는가?
    • Chunk 
      • 각 커밋 사이에 처리되는 row 수
    • Chunk 지향 처리란?
      • Chunk 단위로 트랜잭션을 다루는 것 
      • Reader와 Processor에서는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리
        • Reader에서 데이터를 하나 읽어옵니다
        • 읽어온 데이터를 Processor에서 가공합니다
        • 가공된 데이터들을 별도의 공간에 모은 뒤, Chunk 단위만큼 쌓이게 되면 Writer에 전달하고 Writer는 일괄 저장합니다.
    • Why Chunk?
      • 커밋을 매번 하면 비용이 많이 듭니다. 데이터가 많은 경우라면 (Spring Batch를 사용한다면 당연하겠죠?) 매번 커밋을 하는 것은 이상적이지 않고, 각 트랜잭션에서 가능한 한 많은 항목을 처리하는 것이 바람직합니다. 이러한 이유로 한 커밋 내에서 처리하는 수를 chunk로 관리하게 된 것입니다. 
      • Chunk-oriented 프로세싱을 하게 되면 다양한 기능들을 사용할 수 있는 것이 장점인데 skip, retry, 특정 Exception에 대한 Rollback, 다양한 ItemReader 그리고Cursor, Paging 등이 대표적이다.
    • Page Size와 Chunk Size
      • PagingItemReader를 사용하면 보이는 Page Size는 한번에 조회할 Item의 양이고, Chunk Size는 한번에 처리될 트랜잭션 단위이다.
      • 보편적으로 두개의 사이즈 크기는 일치하는게 좋다. 
  • Cursor 기반 vs Paging 기반
    • Cursor는 한칸씩 커서를 옮기면서 데이터 1 Row씩을 가져온다.
      • 배치 처리가 완료될 때까지 데이터를 읽어오기 때문에 DB Connection Time이 Paging보다 길다. 
      • 모든 데이터를 메모리에 저장하기 때문에 메모리 사용량이 많다.
    • Paging은 설정한 PageSize 만큼 데이터를 가져오며 데이터 결과의 순서가 보장될 수 있도록 order by 사용 필요
      • PageSize만큼 DB Connection을 읽고 종료한다. 따라서 Cursor에 비해 상대적으로 DB Connection Time이 적다. 
      • 하지만 트랜잭션을 여러번 타야하는 단점이 있긴 하다. 
  • Multi Thread & Partitioning
    • 정해진 시간 안에 많은 데이터를 처리하기 위해 성능을 높이기 위해 사용하는 방법
    • 서비스에 적재된 데이터가 적을 경우에는 Spring Batch의 기본 기능들만 사용해도 큰 문제가 없으나, 데이터가 엄청나게 많이 쌓일시 배치 애플리케이션 역시 확장이 필요
    • Multi-threaded Step (아래 그림 참고)
      • 단일 Step을 수행할 때, 해당 Step 내의 각 Chunk를 별도의 여러 쓰레드에서 실행하는 방법
      • 정한 개수(throttleLimit)만큼의 스레드를 생성하여 수행하는데 ItemReader는 반드시 Thread-safe인지 확인해야 하며(데이터를 중복으로 읽어 오지 않게 하기 위해) 스프링 배치에서 제공하는 것중 JdbcPagingItemReader, JpaPagingItemReader가 Thread-safe하다.
    • Partitioning
      • Master가 데이터를 파티셔닝 한 다음 Slave가 개별 스레드를 통해 각 파티션을 처리하는 방식
      • 각 SlaveStep은 ItemReader / ItemProcessor / ItemWriter 등을 갖고 동작하며 작업을 독립적으로 병렬 처리합니다.
    • Multi-threaded 는 Thread-safe를 신경써야 하나 Partitioning은 Thread-safe하지 않아도 됩니다. 
  • tasklet model vs chunk (reader,processor,writer)
    • Step은 Tasklet 혹은 Chunk로 처리할 수 있다. 
    • When to use
      • Tasklet 
        • 실행할 작업이 간단하여 집계가 필요없고, 실행만 필요한 경우
      • Chunk
        • 실행할 작업이 복잡하고 청크 지향 처리를 사용하는 읽기, 처리 및 쓰기와 관련된 작업 실행이 포함된다고 가정합니다.
  • Batch 실행은 어떻게 하는지 
    • Jenkins로 
  • 모니터링은 어떻게? 
    • 예전에는 Spring Batch Admin이 있었는데 deprecated 되었고, 현재는 spring에서 Spring Cloud Data Flow 를 사용하라고 한다. 
    • 요건 직접 사용해보고 내용 추가해보겠습니다~
 
 

 

Spring Batch Meta Table


 

Spring Batch Retry


Multi-thread Step


 

AsyncItemProcessor / AsyncItemWriter


Cursor vs Paging

 


참고

https://jojoldu.tistory.com/489

https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#commitInterval

https://www.egovframe.go.kr/wiki/doku.php?id=egovframework:rte2:brte:batch_core:skip_repeat_retry 

https://velog.io/@backtony/Spring-Batch-%EB%A9%80%ED%8B%B0-%EC%8A%A4%EB%A0%88%EB%93%9C-%ED%94%84%EB%A1%9C%EC%84%B8%EC%8B%B1

아래 강의를 보고 정리한 내용입니다. 

[우아한테크세미나] 190926 우아한스프링배치 by 우아한형제들 이동욱님  https://youtu.be/_nkJkWVH-mo

 

  • JobParameter
    • Spring Batch는 외부에서 파라미터를 주입받아 Batch 컴포넌트에서 사용 할 수 있다. 
    • 사용법 : @Value("#{jobParameters[파라미터명]}") 타입 이름
  • @JobScope
    • Step에서 사용할 수 있고, Job이 실행되는 시점에 Bean이 생성된다.
    • 사용 예
      • @JobScope라고 선언해줘야 호출한 곳에서 넘겨진 파라미터를 받을 수 있다.
      • 컴파일 에러 방지를 위해 우선은 null값을 넘겨준다.
@Bean
public Job scopeJob() {
	return jobBuilderFactory.get("scopeJob")
    			.start(scopeStep1(null))
                            .next(scopeStep2())
                            .build();
}

@Bean
@JobScope
public Step scopeStep1(@Value("#{jobParameters[requestDate]}") String requestDate) {
	return stepBuilderFactory.get("scopeStep1")
    				 .tasklet((contribution, chunkContext) -> {
                             	log.info(">>> ");
                                return RepeatStatus.FINISHED;
                             })
                             .build();
}
  • @StepScope
    • Tasklet / Reader / Processor / Writer에서 사용할 수 있다.
@Bean
pulbic Step scopeStep2() {
	return stepBuilderFactory.get("scopeStep2")
    .tasklet(scopeStep2Tasklet(null))
    .build();
}

@Bean
@StepScope
public Tasklet scopeStep2Tasklet(@Value("#{jobParameters[requestDate]}") String requestDate) {
	return (contribution, chunkContext) -> {
    	log.info(">>>");
        return RepeatStatus.FINISHED;
    }
}

 

  • Type
    • Spring Batch의 JobParameter는 Long / String / Double / Date 타입들을 지원한다.
    • Enum / LocalDate / LocalDateTime은 지원하지 않는다.
      • 그렇다면?
        • @Value의 특성을 이용하면 String을 매번 LocalDateTime으로 변경하지 않아도 된다. (아래 코드 참고)
          • setter메소드에 @Value를 선언 문자열로 받은뒤 원하는 타입으로 세팅
@Slf4j
@Getter
@NoArgsConstructor
public class CreateDateJobParameter {
	private LocalDate localDate;
    
    @Value("#{jobParameters[createDate]}")
    public void setCreateDate(String createDate) {
    	DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        this.createDate = LocalDate.parse(createDate, formatter);
    }
}

 

@JobScope로 설정해놨기 때문에 Job이 실행될 때 Bean이 실행된다. 그러면 job실행되는 시점에 jobParameter Bean이 생성되서 값이 세팅된 다음에 CreateDateJobParameter에 Injection이 된다.

public class JObParameterBatchConfiguration {
	private final CreateDateJobParameter jobParameter;
    
    @Bean(BATCH_NAME + "jobParameter")
    @JobScope
    public CreateDateJobParameter jobParameter() {
    	return new CreateDateJobParameter():
    }
}

 

그 다음부터는 아래와 같이 사용하면 된다.jobParameter.getCreateDate()만 호출하면 된다. 

@Bean(name = BATCH_NAME + "_reader")
@StepScope
public JpaPagingItemReader<Product> reader() {
	Map<String, Object> params = new HashMap<>();
    params.put("createDate", jobParameter.getCreateDate()); // 어디에서나 LocalDate을 가져올 수 있음
    
    return new JpaPagingItemReaderBuilder<Product>()
    			.name(BATCH_NAME + "_reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Product p WHERE p.createDate = :createDate")
                .parameterValues(params)
                .build();
}

 

  • @JobScope, @StepScope의 특징(Late Binding)을 활용해보기
    •  @JobScope나 @StepScope가 선언된  Bean의 경우에는 Job이나 Step이 실행될 때 Bean이 생성되는 특징을 가진다. (일반 Spring처럼 애플리케이션이 로딩될 때 Bean이 생성되는 것이 아니라)
    • 그 뜻은 애플리케이션 실행 후에도 동적으로 reader / processor / writer bean 생성이 가능하다는 뜻
      • 예를 들아보자
        • 정산하는 시스템에서 ERP 연동이 2-30개가 되는데 하는 일이 비슷
        • 주문데이터 긁어와서 어디에 보내는 것~ 파라미터가 주문이냐 매출이냐 광고냐 그리고 읽어와야 될 테이블 이 값만 다르고 다른 건 전부 동일 
        • 그럴 때 마다 같은 클래스를 계속 생성할 수 없으니 LateBinding을 이용해서 파라미터로 주문으로 던지면 주문 테이블에서 읽어오는 리더로 바꿔서 배치를 돌리고, 광고면 광고 테이블에서 읽어오는 리더로 바꿔서 배치를 돌리는 것이다.
private StimpleStepBuilder<EaiEntity, EaiItem> readerAndProcessor(String dealCode, String txDateStr) {
	EaiReaderParameterDato parameterDto = createParameter(dealCode, txDateStr);
    
    EaiTaskletType readerProcessorType = EaiDatailType.findTaskletType(dealCode);
    EaiReaderFactory readerCreator = readerProcessorType.getReaderCreator();
    
    return stepBuilderFactory.get(stepName)
    		 	.<~>chunk(chunkSize)
                             .reader(readerCreator.create(chunkSize, emf, parameterDto)) 
                             .processor(eaiProcessor);
}

제가 맡고있는 SpringBoot 프로젝트에선 JPA를 사용하고 있으며 분산 환경 데이터베이스를 구축하기 위해  AbstractRoutingDataSource 및 Spring Data JPA를 사용하여 Dynamic DataSource 라우팅을 하고 있습니다. 

@Data
@Configuration
@ConfigurationProperties(prefix = "test")
public class NamedDataSources {
   private List<NamedDataSource> namedDataSources;
}
@Getter
@Setter
public class NamedDataSource {
   private String name;
   private HikariConfig hikari;
}
  1. yml 파일에 입력된 db 설정 정보가 NamedDataSources 클래스에 로딩된다.
  2. DataSource를 생성한다. (DetermineRoutingDatasource)
    1. DetermineRoutingDataSource 인스턴스 생성
    2. DatermineRoutingDataSource에는 key,value 형태로 저장된 NamedDataSources 정보가 있다.
    3. Default 데이터소스가 등록된다.
    4. DataSoucre로 DetermineRoutingDataSource가 리턴된다.
  3. EntityManager Bean (LocalContainerEntityManagerFactory)을 등록한다.
    1. Datasource지정-> (2번에서 생성한 DetermineRoutingDataSource)로 설정
    2. Hibernate Property, Entity가 위치한 Package 지정
    3. Hibernate 기반으로 동작하는 것을 지정하는 JPA Vendor 설정
  4. TransactionManager Bean을 등록한다.
    1. LocalContainerEntityManagerFactory Bean을 주입받음
    2. Datasource와 EntityManagerFactoryBean에서 생성되는 EntityManagerFactory를 지정

 

@Configuration
@EnableJpaRepositories(basePackages = "com.xx.xxx.xx.xx",
        transactionManagerRef = "transcationManager",
        entityManagerFactoryRef = "entityManager")
@EnableTransactionManagement
public class NamedRoutingDataSources {

    private final TestProperties jpaProps;
    private final NamedDataSources namedDataSources;

    public NamedRoutingDataSources(TestProperties jpaProps, NamedDataSources namedDataSources) {
        this.jpaProps = jpaProps;
        this.namedDataSources = namedDataSources;
    }

    @Primary
    @Bean
    public DataSource createRoutingDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();

        for (NamedDataSource namedDataSource : namedDataSources.getNamedDataSources()) {
            targetDataSources.put(DatabaseCluster.valueOf(namedDataSource.getName()), new HikariDataSource(namedDataSource.getHikari()));
        }

        DetermineRoutingDataSource routingDataSource = new DetermineRoutingDataSource();
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(targetDataSources.get(DatabaseCluster.MZ));

        return routingDataSource;
    }

    @Bean(name = "entityManager")
    public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean() {
        LocalContainerEntityManagerFactoryBean factoryBean = new LocalContainerEntityManagerFactoryBean();
        factoryBean.setDataSource(createRoutingDataSource());
        factoryBean.setPackagesToScan("com.xx.xxx.xx.xx");
        factoryBean.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
        factoryBean.setJpaProperties(initJpaHibernateProperties());
        return factoryBean;
    }

    private Properties initJpaHibernateProperties() {
        Properties properties = new Properties();
        properties.put(AvailableSettings.FORMAT_SQL, jpaProps.getProperties().getHibernate().isFormatSql());
        properties.put(AvailableSettings.SHOW_SQL, jpaProps.getProperties().getHibernate().isShowSql());
        return properties;
    }

    @Bean(name = "transcationManager")
    public JpaTransactionManager transactionManager(
            @Autowired @Qualifier("entityManager") LocalContainerEntityManagerFactoryBean entityManagerFactoryBean) {
        return new JpaTransactionManager(entityManagerFactoryBean.getObject());
    }
}

SpringBoot 프로젝트가 로딩 될 때 위와 같이 설정이 됩니다.

크게 보자면 먼저 Datasource를 생성하고, Spring 프로젝트에서 JPA를 사용하기 위해 EntityManager를 설정합니다. 그리고 Spring Container에서 동작하는 JPA의 기능을 활용하고, 스프링이 제공하는 일관성 있는 데이터 액세스 기술의 접근 방법을 적용할 수 있도록 LocalContainerEntityManager를 생성합니다.

 

 

AbstractRoutingDataSource는 조회 키를 기반으로 다양한 대상 데이터 소스 중 하나로 호출을 라우팅하는 DataSource의 추상 구현체입니다. AbstractRoutingDataSource는 현재 컨텍스트를 기반으로 실제 데이터 소스를 동적으로 결정하는 방법을 제공하기 위해 Spring 2.0.1 버전에 도입되었습니다. 컨텍스트 변경을 통해 전환되는 여러 데이터 소스의 맵을 유지 관리합니다.

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class DetermineRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return TestThreadLocal.getDatabaseCluster();
    }
}
import org.springframework.stereotype.Component;

@Component
public class TestThreadLocal {

    public static final ThreadLocal<DatabaseCluster> contextHolder = new ThreadLocal<>();

    public static DatabaseCluster getDatabaseCluster() {
        return contextHolder.get();
    }

    public static void setDatabaseCluster(DatabaseCluster databaseCluster) {
        contextHolder.set(databaseCluster);
    }

    public static void clear() {
        contextHolder.remove();
    }
}

https://www.websparrow.org/spring/spring-boot-dynamic-datasource-routing-using-abstractroutingdatasource

 

현재 개발중인 애플리케이션은 Multi Datasource를 사용중이다. 

Header로 전달받은 값에 따라 연결할 데이터베이스를 결정하고, 경우에 따라선 한 API에서 2개 이상의 데이터베이스에 접근해야 한다. JPA를 사용중이고, 몇 십개의 엔티티들은 대부분 다 관계성을 띄고 있어 FetchType을 기본적으로 Lazy Association으로 가져간다.

 

OSIV (Open Session In View)

OSIV는 개발중에 처음 알게된 용어이다. 엔티티들의 수많은 관계속에서 딱 원하는 Response 데이터 형태를 만들어내기란 여간 힘든게 아니었다. 삽질의 시간속에서 허우적 댈 때쯤 Service Layer에서 CRUD중 C/U/D의 경우에는 @Transactional로 선언된 메소드들이 있었다. 그런데 @Transactional 메소들를 벗어난 이후의 동작이 예상대로 돌아가지 않는 것이다. 특히 데이터베이스 각각에 Update를 해야되는 경우였는데 (그 외에도 몇개의 문제상황이 있었다) A데이터베이스에서 쿼리를 하고, B데이터베이스에다 작업을 하기 위해 MultiDataSource를 사용하기 위해 이용되는 DatabaseContextHolder에 B데이터베이스를 재할당하였다. 당시 나는 당연히 B로 연결이 될 것이라 생각했는데 이게 웬걸 B 데이터베이스로 연결되지 않고 처음에 맺었던 A 데이터베이스에다가 모든 작업을 하고 있던게 아닌가. A에 하는 작업과 B에 하는 작업을 메소드 분리하고, 각각의 메소드에 @Transactional을 걸어줘봤는데 실패, propagation을 requires_new 로도 해보고 여러가지를 시도해봤는데도 실패. 그렇게 삽질을 하고 나서야 알게된게 OSIV였다.

 

일단 @Transactionl에 대해 알아보고, OSIV를 알아보도록 하자

 

  • @Transactional
    • 스프링에서는 간단하게 어노테이션 방식으로 @Transactional을 메소드, 클래스, 인터페이스 위에 추가하여 사용하는 방식이 일반적이다. 이 방식을 선언적 트랜잭션이라 부르며, 적용된 범위에서는 트랜잭션 기능이 포함된 프록시 객체가 생성되어 자동으로 commit 혹은 rollback을 진행해준다.
    • Spring이 제공하는 어노테이션으로 @Transactional을 메서드 또는 클래스에 명시하게 되면 특정 메서드 또는 클래스가 제공하는 모든 메서드에 대해 내부적으로 AOP를 통해 트랜잭션 처리코드가 전 후 로 수행된다.
      • AOP는 일반적으로 두가지 방식이 있는데 그 중에 하나인 Dynamic Proxy로 예를 들어보면
        • @Transactional가 걸려있는 메소드 a() 가 있으면 Proxy는 런타임 시점에 해당 a()메소드를 호출하는 코드를 포함, 트랜잭션 처리에 필요한 코드를 전후로 감싸 트랜잭션 처리를 다이나믹 프록시 객체에 대신 위임한다. 
    • 하지만, @Transactional이 걸려있는 메소드가 종료됟더라도 OSIV에 의해 트랜잭션이 끝나도 영속성 컨텍스트는 유지된다.
  • OSIV와 영속성컨텍스트
    • OSIV(Open Session In View)는 영속성 컨텍스트를 뷰까지 열어두는 기능이다. 영속성 컨텍스트가 유지되면 엔티티도 영속 상태로 유지된다. 뷰까지 영속성 컨텍스트가 살아있다면 뷰에서도 지연 로딩을 사용할 수가 있다.
    • OSIV 전략은 트랜잭션 시작처럼 최초 데이터베이스 커넥션 시작 시점부터 API 응답이 끝날 때 까지 영속성 컨텍스트와 데이터베이스 커넥션을 유지한다. 그래서 View Template이나 API 컨트롤러에서 지연 로딩이 가능하다.
    • 스프링에서는 OSIV가 기본값(true)으로 설정되어있다. 기본적으로는 트랜잭션을 시작할 때 영속성 컨텍스트가 DB 커넥션을 가져온다. 커넥션을 획득한 후에는 API 응답이 끝날때까지 유지한다. 트랜잭션이 끝나도 지연 로딩으로 프록시 객체를 초기화할 상황이 생기기 때문이다. 따라서 영속성 컨텍스트가 DB 커넥션을 계속 물고 있어야 한다.
    • JPA의 영속성 컨텍스트는 결국 DB를 1:1로 쓰면서 동작한다.
  • OSIV의 문제점?
    • 오랫동안 DB 커넥션을 물고 있기 때문에 실시간 트래픽이 중요한 애플리케이션에서는 커넥션이 모자랄 수 있다. 이는 결국 장애로 이어진다.
    • Multi Datasource를 사용하는 경우에는 처음 맺은 DB 커넥션을 계속 가져가기 때문에 DB Connection을 바꾸는 데에 있어 문제가 생길 수 있다.
  • OSIV = false
    • OSIV를 끄면 트랜잭션을 종료할 때 영속성 컨텍스트를 닫고, 데이터베이스 커넥션도 반환한다. 따라서 커넥션 리소스를 낭비하지 않는다. 하지만 모든 지연로딩을 트랜잭션 안에서 처리해야 한다. 따라서 지금까지 작성한 많은 지연 로딩 코드를 트랜잭션 안으로 넣어야 하는 단점이 있다. 그리고 view template에서 지연로딩이 동작하지 않는다.
    • CRUD 중에서 CUD의 경우에는 기본적으로 서비스레이어에 @Transactional을 설정하였고, Controller에는 비즈니스 로직이 없기 때문에 문제가 없을 수 있지만 R의 경우는 서비스레이어에서 Lazy Loading을 이용해 엔티티를 조작하려고 할 때 Exception이 발생하기 때문에 이 경우에는 @Transational과 read-only값을 true로 설정함으로써 해결이 가능하다.
  • OSIV=false & MultiDatasource
    • OSIV를 true로 하게 되면 최초 연결된 세션을 재사용하여 DetermineRoutingDatasource 클래스의 determineCurrentLookupKey 메소드가 동작하지 않게 되고, 결과적으로 DatabaseContextHolder로 재설정한 데이터베이스로 연결이 되지 않는다. 따라서 MultiDatasource를 사용하는 애플리케이션의 경우엔 OSIV를 false로 가져가는 것이 맞다. 

 

Datasoure를 변경하려고 하면 최초 연결된 세션을 재사용하기 때문에  DetermineRoutingDatasource 클래스의determineCurrentLookupKey 메소드가 동작하지 않게 되고, 결과적으로 DatabaseContextHolder로 재설정한 데이터베이스로 연결이 되지 않았다. 이 문제는 OSIV 를 false로 둠으로써 해결할 수 있었다. 

OSIV는 Spring에서 기본적으로 true로 가져가고 있지만, 애플리케이션의 복잡도와 성능을 고려해보고 이 값을 false로 변경해보는 것을 고려해보아야 한다.

 

 

참고

https://dodeon.gitbook.io/study/kimyounghan-spring-boot-and-jpa-optimization/04-osiv

https://ykh6242.tistory.com/102

https://incheol-jung.gitbook.io/docs/q-and-a/spring/osiv

 

+ Recent posts