ec2 instance에 mvn 설치를 했는데 java version이 1.7로 잡힌다...

pom.xml에 target설정도 했는데 package를 하면 계속 에러가...하..

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.282.b08-1.61.amzn1.x86_64/jre

를 통해서 mvn의 java version을 바꿔주니까 제대로 된다^^;;;;

  • Batch 
    • 일괄처리
  • 사용 예
    • 엄청나게 큰 데이터를 가공해야 하는데 예를 들어 그 작업을 하루에 1번 정도 해줘야 한다. 
      • 엄청나게 큰 데이터를 가공해야 한다 ? 
        • CPU, I/O 등의 자원 처리에 서버 부하가 발생 
        • Request 처리를 올바르게 할 수 없다. 
        • 데이터 처리 중에 실패 발생하면, 다시 처음부터 하는게 아니라 끝난 지점부터하는게 좋음 (5만1번째)
      • 하루에 딱 1번 한다 ?
        • 하루에 한번만 돌게 하는 것을 구현하려고 API를 구성하는 것은 낭비일 수 있음 
        • 다른 사람이 한번 더 돌리는 실수를 막아야 함
  • SpringBatch는
    • '단발성으로 대용량의 데이터를 처리하는 어플리케이션' 이다.
      • Spring MVC를 사용함으로써 비즈니스 로직에 최대한 집중할 수 있었던 것 처럼 Spring Batch를 이용하면 배치 어플리케이션을 동작하는 데 필요한 로직에만 집중할 수 있다.
      • 예를 들어, 이미 실행된 경우 재실행을 불가능 하게 한다거나, 실패된 지점에서 부터 다시 동작시키게 할 수 있다거나... 이런 기능들을 SpringBatch가 쉽게 할 수 있도록 도와준다. 
      •  
  • Batch Application의 조건
    • 대용량 데이터 - 대용량의 데이터를 가져오거나 전달, 계산 등의 처리를 함 
    • 자동화 - 사용자 개입을 최소로 줄여 최대한 자동으로 동작될 수 있도록 한다.
    • 견고성 - 잘못된 데이터를 충돌/중단 없이 처리
    • 신뢰성 - 잘못된 부분을 추적할 수 있음 (로깅 / 알림)
    • 성능 - 지정된 시간 내에 처리, 다른 어플리케이션을 방해하지 않는다.

아래의 그림과는 표현한 방식이 살짝 다르다.
Job과 Step은 작업의 최소 단위,  Step을 통해 데이터 읽기/가공/결과기록 등의 로직을 묶어서 관리한다. Chunk는 한 번의 operation을 통해 다룰 데이터의 집합으로 각 Step은 설정에 정의된 Chunk 단위에 따라 데이터를 읽어 들이고 가공한 후 기록한다. Step의 설정에서 Chunk의 크기를 결정하는데 이는 데이터베이스의 한 row일 수도 있고, CSV 파일의 한 줄일 수도 있다. Chunk는 트랜잭션 관리 단위이기도 하여 마지막 처리 시점에 TX를 커밋하거나 롤백한다. ItemReader<Input> 은 작업 대상이 될 데이터를 읽어 들이는 컴포넌트이다. ItemProcessor<Input,Output>은 로직에 따라 가공하고, 변경된 데이터를 리턴한다. ItemWriter<Output>은 완성된 데이터를 정해진 곳에 저장/색인한다.

 

  • Batch Application 생성 - Simple ver.

Spring Initializer를 이용해 위의 세팅으로 프로젝트 생성

 

SpringBatch의 여러 기능을 쓰려면 필수로 추가해야하는 어노테이션
Job 생성과 Step 생성

run을 해보면..

 

log.info(">>>>> This is Step1") 가 잘 수행되어있음

 

 

 

 

 

  • Batch Application 생성 - hard ver.
    • PostgreSQL 환경에서 Spring Batch 실행해보기 
    • 먼저, Spring Batch를 작동하기 위해 필요한 메타 데이터 테이블을 세팅한다. 
      • Spring Batch 의 메타데이터는..
        • 이전에 실행한 Job에 대한 기록
        • 실패한 Batch Parameter 기록, 성공한 Job 기록
        • 재실행시 시작해야 될 포인트 기록
        • Job이 갖고 있는 Step에 대한 Histroy tracking (성공/실패 Step 기록)

소스 라이브러리에서 spring-batch-core 디렉토리 안에 있다.

 

더보기

-- Autogenerated: do not edit this file

CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL TIMESTAMP DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME TIMESTAMP NOT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

위의 쿼리문을 이용해 table을 생성해준다. 나는 AWS RDS를 이용해 PostgreSQL을 설치해 주었다. 

 

application.yml에 datasource 설정 완료

 

 

참조

medium.com/myrealtrip-product/spring-batch-%EC%B2%98%EC%9D%8C%EB%B6%80%ED%84%B0-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0-3c6a5db0646d

 

Spring Batch, 처음부터 시작하기

커머스 서비스 개발자를 위한 Spring Batch 입문

medium.com

jojoldu.tistory.com/325?category=902551

 

2. Spring Batch 가이드 - Batch Job 실행해보기

이번 시간에는 간단한 Spring Batch Job을 생성 & 실행하면서 전반적인 내용을 공부해보겠습니다. 작업한 모든 코드는 Github에 있으니 참고하시면 됩니다. 2-1. Spring Batch 프로젝트 생성하기 기본적인

jojoldu.tistory.com

 

medium.com/swlh/from-java-8-to-java-15-in-ten-minutes-f42d422a581e

 

From Java 8 to Java 15 in Ten Minutes

This blog will give you samples of awesome new feature added since Java 7. I’ll showcase at least one major improvement for each Java…

medium.com

 

좋은 자료인 것 같아 저장해두고 하나씩 적용해봐야지

  • AWS STS를 사용하면 AWS 리소스에 대한 액세스를 제어할 수 있는 임시 보안 자격 증명을 생성하여 특정 사용자에게 제공할 수 있다. 
    • 단기적인 자격으로 애플리케이션에 장기 AWS 보안 자격 증명을 배포 또는 포함할 필요가 없다. 
    • 사용자 AWS자격 증명을 제공하지 않아도 AWS 리소스에 대한 액세스 권한을 사용자에게 제공할 수 있다. 
    • 수명이 제한되어 있어 만료된 경우 새로 요청해야 한다. 
  • AssumeRole
    • 엑세스 권한이 없는 AWS 리소스에 액세스 할 수 있도록 허용하는 데 유용 
      • 다른 AWS 계정의 리소스에 액세스해야 할 수 있다. 
    • 호출할 때 필요한 정보 
      • ARN (리소스 정보)
      • 기간 (임시 보안 자격 증명의 기간)
      • 세션 이름 
    • 역할을 만들어 IAM 사용자에게 권한 위임
      • IAM 역할을 사용해 권한을 줄 계정과 권한을 받을 계정 간에 관계 설정이 가능 
      • 신뢰받는 계정은 리소스에 대한 엑세스가 필요한 사용자를 저장 
      • 권한을 받은 계정의 애플리케이션은 AWS STS의 AssumeRole API 작업을 사용할 수 있음 
      • 권한을 받을 계정이 내가 제어하지 않는 AWS 계정인 경우 externalId 속성을 사용해야 한다. 

예를 들어 Example Corp이라는 타사를 고용해 AWS 계정을 모니터링하고 비용을 최적화하기로 했다고 가정해봅시다. 일일 경비를 추적하기 위해 Example Corp은 AWS 리소스에 접근해야 합니다. Example Corp 역시 다른 고객을 위해 다른 많은 AWS 계정을 모니터링합니다.

IAM 사용자 및 AWS 계정의 장기 자격 증명에 대한 액세스 권한을 Example Corp에게 제공하지 마십시오. 대신 IAM 역할과 임시 보안 자격 증명을 사용합니다. IAM 역할은 장기 자격 증명(예: IAM 사용자의 액세스 키)을 공유하지 않고도 AWS 리소스에 액세스할 수 있도록 허용하는 메커니즘을 타사에게 제공합니다.

IAM 역할을 사용하여 AWS 계정과 Example Corp 계정 사이에 신뢰 받는 관계를 설정할 수 있습니다. 이 관계가 설정된 후 Example Corp 계정의 멤버는 AWS STS AssumeRole API를 호출하여 임시 보안 자격 증명을 얻을 수 있습니다. Example Corp 멤버는 자격 증명을 사용하여 계정의 AWS 리소스에 액세스할 수 있습니다.

 

  1. Example Corp을 고용해 고유한 사용자 지정 식별자를 생성하도록 합니다. 이 고유 고객 ID와 AWS 계정 번호를 제공합니다. 이 정보는 다음 단계에서 IAM 역할을 생성하는 데 필요합니다.

    참고

    이 식별자가 Example Corp의 각 고객에게 고유한 것이라면 Example Corp는 ExternalId에 대해 그들이 원하는 어떤 문자열 값이라도 사용할 수 있습니다. 두 고객이 같은 값을 갖지 않는 한, 고객 계정 번호 또는 임의 문자열이 될 수 있습니다. 이는 '보안 유지'를 위한 것은 아닙니다. Example Corp은 각 고객에게 ExternalId 값을 제공해야 합니다. 가장 중요한 것은 그들의 고객이 아닌 Example Corp이 그것을 생성해야 한다는 것입니다.

  2. AWS에 로그인해 Example Corp에 리소스에 대한 액세스 권한을 부여하는 IAM 역할을 생성합니다. IAM 역할과 마찬가지로 해당 역할에도 권한 정책과 신뢰 정책이라는 2가지 정책이 있습니다. 그 역할의 신뢰 정책은 역할을 위임할 사용자를 지정합니다. 이 예시 시나리오에서 정책은 Example Corp의 AWS 계정 번호를 Principal로 지정합니다. 이렇게 하면 계정의 자격 증명이 그 역할을 수임하도록 허용합니다. 또한, Condition 요소를 신뢰 정책에 추가합니다. 이 Condition은 Example Corp의 고유 고객 ID와 일치하는지 확인하기 위해 ExternalId 컨텍스트 키를 테스트합니다. 예를 들면 다음과 같습니다.

     

    "Principal": {"AWS": "Example Corp's AWS Account ID"}, "Condition": {"StringEquals": {"sts:ExternalId": "Unique ID Assigned by Example Corp"}}
  3. 역할에 대한 권한 정책은 해당 역할이 누군가가 수행하도록 허용할 수 있는 작업을 지정합니다. 예를 들어 그 역할은 누군가에게 IAM 사용자나 그룹이 아닌 Amazon EC2 또는 Amazon RDS 리소스만을 관리할 수 있게 허용하도록 지정할 수 있습니다. 이 예시 시나리오에서는 권한 정책을 사용하여 Example Corp에게 계정의 리소스 전체에 대한 읽기 전용 액세스 권한을 부여합니다.

  4. 역할을 정의한 후에는 역할의 Amazon 리소스 이름(ARN)을 Example Corp에 제공합니다.

  5. Example Corp이 AWS 리소스에 액세스해야 할 때는 그 회사의 누군가가 AWSsts:AssumeRole API를 호출합니다. 이 호출에는 수임할 역할의 ARN과 사용자 지정 ID에 해당하는 ExternalId 파라미터가 포함되어 있습니다.

 

docs.aws.amazon.com/ko_kr/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html

 

AWS 리소스에 대한 액세스를 서드 파티에 부여할 때 외부 ID를 사용하는 방법 - AWS Identity and Access M

이 식별자가 Example Corp의 각 고객에게 고유한 것이라면 Example Corp는 ExternalId에 대해 그들이 원하는 어떤 문자열 값이라도 사용할 수 있습니다. 두 고객이 같은 값을 갖지 않는 한, 고객 계정 번호

docs.aws.amazon.com

 

현재 진행하고 있는 프로젝트에서 데이터 처리하는 데 시간이 많이 소요되는 부분이 있어 

이것을 개선하고자 관련해서 공부를 해보려 한다. 

 

우선 내가 처한 상황은,

AWS ComputeOptimizer 서비스 API를 사용하려는데 모든 Region에 대한 데이터가 필요함.

그런데 각 Region별로 Client를 생성해서 API 호출 및 데이터 변환에 많은 시간이 소요.

따라서, 해당 부분의 성능을 향상시켜 페이지 로딩시 데이터를 최대한 빨리 가져오도록 하려 한다. 

 

 

먼저 기초부터 알고 가자!

  • JVM
    • 자바 애플리케이션을 클래스 로더를 통해 읽어 들여 자바 API와 함께 실행하는 역할 
    • JAVA와 OS 사이에서 중개자 역할을 수행하여, OS에 구애받지 않고 사용할 수 있게 해준다.
    • 메모리관리, GarbageCollection을 수행한다. 
    • 스택기반의 가상머신
  • Java Program 실행과정 
    • 프로그램이 실행되면 JVM이 OS로부터 메모리를 할당받음 
      • JVM은 이 메모리를 용도에 따라 여러 영역으로 나누어 관리 (아래 참고)
    • 자바 컴파일러가 자바 소스코드를 읽어들여 바이트코드(.class)로 변환 
    • Class Loader를 통해 class 파일들을 JVM으로 로딩 
    • Execution engine을 통해 class파일 해석
    • 해석된 바이트코드는 Runtime Data Areas에 배치되고 실행된다.
    • JVM은 필요에 따라 GC 와 같은 관리 작업을 수행한다. 

출처 : asfirstalways.tistory.com/158

HEAP

 

  • JVM 메모리 구조(RuntimeDataAreas)
    • Thread별로 생성되는 데이터 영역
      • pc register
        • Thread가 어떤 부분을 어떤 명령으로 실행해야 할 지 기록한다.
      • stack
        • 로컬 변수, 일부 실행 결과, 메소드 호출 또는 반환 등을 저장
        • 메소드 호출 시마다 각각의 스택 프레임이 생성되고, 메소드 수행이 끝나면 삭제됨
        • 메소드 안에서 사용되는 값을 저장 
      • native method stack
    • 전체 Thread가 공유하는 데이터 영역
      • heap
        • 객체를 저장하는 영역, 할당된 객체 해제는 Garbage Collector에 의해서만 가능 
        • Permanent Generation
          • 객체 주소값 저장
        • New/Young
          • Eden : 객체 최초 생성 공간
          • Survivor 0/1 : Eden에서 참조 되는 객체들이 저장되는 공간
        • Old
          • New area에서 일정 시간 참조되고 있는 객체들이 저장되는 공간
      • method area
        • 클래스 정보를 처음 메모리 공간에 올릴 때 초기화되는 대상을 저장하기 위한 메모리 공간
        • 클래스의 필드, 메소드 정보, static 변수, 메소드와 생성자의 바이트 코드, 클래스, 인터페이스와 관련된 Runtime Constant Pool이 저장되며, Garbage Collector 대상이 아님

출처 : hongsii.github.io/2018/12/20/jvm-memory-structure/

 

 

  • Thread
    • 프로세스 내부에 존재하는 프로세스보다 더 작은 작업의 단위
    • 한 프로세스 내에서 데이터를 병렬적으로 처리하고 각 쓰레드의 결과를 합쳐서 사용하게 되면 데이터 처리를 하는 데 시간을 절약할 수 있다. 
    • 생성방법
      • Runnable 인터페이스 사용
        • run() 메소드를 구현하여 Thread 사용할 수 있음
      • Thread 클래스 사용 (Runnable 클래스 구현한 것)
    • 주의해야 할 점
      • 운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 예상치 못한 방식으로 크래시될 수 있으므로 기존스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나지 않도록 주의해야 한다. 
  • ThreadPool
    • 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다. 
    • 자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공
    • newFixedThreadPool 메서드를 이용해 스레드 풀을 만든다. 
      • 이 메서드는 워커 스레드라 불리는 ExcecutorService를 만들고 이들은 스레드 풀에 저장된다. 
    • 스레드풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행하고, 실행이 종료되면 이들 스레드는 풀로 반환된다. 
    • 스레드 풀을 사용할 때 주의해야 할 점
      • 잠을 자거나 I/O를 기다리거나 네트워크 연결을 기다리는 테스크가 있다면 주의하기 
      • 처음 제출한 태스크가 기존 실행 중인 태스크가 나중의 테스크 제출을 기다리는 상황 피하기 (데드락)
      • 프로그램을 종료하기 전에 모든 스레드 풀을 종료해야 함
  • Multi Thread
    • Stack 영역을 스레드 갯수 만큼 분할한다.
    • 하나의 프로그램에서 여러 개의 실행 흐름을 만들고 실행하게 한다.  

 

이제 본격적으로 자바 8 API에 추가된 Stream부터 알아보도록 하자.

  • Stream
    • Stream vs Collection
      • 데이터 계산 방법
        • 현재 자료구조가 포함하는 모든 값을 메모리에 저장하는 컬렉션과 달리 스트림은 이론적으로 요청할 때만 요소를 계산한다.
        • 사용자가 요청하는 값만 스트림에서 추출한다는 것이 핵심
      • 데이터 반복 처리 방법
        • 컬렉션은 사용자가 직접 요소를 반복하는 외부 반복
          • "미영아 상자안에 뭐 있어?" "인형" "인형 가져올래? 그리도 뭐있어?" "과자" "과자 가져올래? 그리고 뭐있어?" "양말" "양말 가져올래 그리고 뭐있어?" "이제 없어" "그럼 됐어"
        • 스트림 라이브러리는 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장해주는 내부반복 
          • "미영아 상자안에 있는 거 다가져와"

  • 병렬스트림 (parallelStream)
    • ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다. compute 메서드는 병렬로 실행할 수 있을 만큼 태스크의 크기가 충분히 작아졌는지 확인하고 크면 두 개의 새로운 ForkJoinSumCalculator로 할당, 이 과정이 반복되어 분할을 반복하고 서브 태스크들이 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문하여 부분 결과를 합쳐 최종 결과를 계산한다.  
    • 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스티림
    • 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다. 
    • 병렬스트림에서 사용하는 스레드 풀 설정
      • 병렬 스트림은 내부적으로 ForkJoinPool을 사용
        • ForkJoinPool
          • 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService인터페이스를 구현한다.  
          • 쓰레드풀 서비스의 일종으로 fork를 통해 task를 분담하고 join을 통해 합침
          • 기본적으로 ExecutorService의 구현체지만 다른 점은 thread들이 개별 큐를 가지며 자신에게 아무런 task가 없으면 다른 Thread의 task를 가져와 처리하여 CPU 자원이 늘지 않고 최적의 성능을 낼 수 있다. 
      • 프로세서수가 반환하는 값에 상응하는 스레드를 가진다.(기기의프로세서수)
        • Runtime.getRuntime().availableProcessors();
    • 언제 사용하나? 
      • 전체 사이즈를 아는 array, arrayList와 같은 경우 (분할이 잘 이루어지는 데이터 구조)
      • 병렬로 처리되는 작업이 독립적인 경우 (stream의 중간 단계 연산 중 sorted() 혹은 distinct() 사용 XX)
        • 공유된 가변 상태 피해야 함
      • 멀티코어 간의 데이토 이동은 비싸기에 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 
        • 스트림을 재귀적으로 분할하고, 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합치는 것보다도 더 시간이 오래 걸리는 작업!
      • limit이나 findFirst처럼 요소의 순서에 의존하는 연산이 없을 때 
        • findFirst대신 findAny는 괜찮다.
      • 병합과정 (Collector나 Combilner) 비용이 비싸면 안됨 (성능측정해보기)
    • 주의해야 할 점 
      • 공유된 thread pool을 사용하기 때문에 성능장애를 만들 수 있음 

fork-join model

 

  • Future (자바5)
    • Future과 CompletableFuture의 관계는 Collection과 Stream의 관계에 비유할 수 있는데, 자바 8에서 새로 제공하는 CompletableFuture클래스는 Future의 기능을 선언형으로 이용할 수 있도록 한 것이다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
	public Double call() {
    		return 시간이 오래걸리는 작업();
    }
});

doSomthingElse();

try {
	Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
	//계산 중 예외 발생
} catch (InterruptedException ie) {
	//현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te) {
	//Future완성되기 전에 타임아웃 발생
}

 

 

  • CompletableFuture - 안정적 비동기 프로그래밍
    • 언제 사용하나 ? 
      • 페이스북의 데이터를 기다리는 동안 트위터 데이터를 처리하고 싶다!
      • 병렬성이 아니라 동시성을 필요로 하는 상황 (조금식 연관된 작업을 같은 CPU에서 동작하는 것)
    • 동시성은 프로그래밍 속성이고 병렬성은 하드웨어 수준에서 지원 
    • 병렬 스트림과의 비교 
      • 병렬 스트림 버전에서는 검색해야할 대상이 스레드 수 보다 많아지는 경우 실행중인 스레드가 종료되길 기다려야 한다. 
      • CompletableFuture는 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다.
        • 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다. 
    • 병렬 스트림과 CompletableFuture  둘 중 어떤 것을 사용해야 할까?
      • I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 병렬 스트림
      • I/O를 기다리는 작업을 병렬로 실행하고, 스레드 수를 적절하게 설정해야 하는 경우 
public Future<Double> getPriceAsync(String product) {
	CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            try {
                double price = 오래 걸리는 작업 메소드();
                futurePrice.complete();
            } catch (Exception ex) {
                futurePrice.completeExceptionally(ex); // Future에 에러 포함해서 종료
            }
        }).start();
        return futurePrice;
}

Future 사용 Example

 

Future<Double> futurePrice = shop.getPriceAsync("dd");

다른 작업 메소드 (); // 가격 가져오게 하고 다른 작업 하기

try {
	double price = futurePrice.get();
} catch (Exception e) {
	throw RuntimeException(e);
}

비동기 메소드 구현 및 사용

 

public Future<Double> getPriceAsync(String product) {
	return CompletableFuture.supplyAsync(() -> 오래 걸리는 작업 메소드(product));
}

CompletableFuture를 직접 만들지 않고 간단하게 만드는 방법 

 

private final Executor executor = 
	Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
   		public Thread newThread(Runnable r){
        	Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
            
        }
    }
});

애플리케이션에 맞는 커스텀 Executor

 

public List<String> findPrices(String product) {
	List<CompletableFuture<String>> priceFutures = 
      shop.stream()
      .map(shop -> CompletableFuture.supplayAsync(() -> shop.getPrice(product), executor))
      .map(future -> future.thenApply(Quote::parse))
      .map(future -> future.thenCompose(quote -> 
      			CompletableFuture.supplyAsync(()-> Discount.applyDiscount(quote), executor)))
      .collect(toList());
      
      
	return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}

동기 작업과 비동기 작업 조합하기

 

Future<Double> futurePriceInUSD = 
	CompletableFuture.supplyAsync(() -> shop.getPrice(product))
    				 .thenCombine(CompletableFuture.supplyAsync(() -> 메소드())
                     	.completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
                    	 (price, rate) -> price * rate ))
                     .orTimeout(3, TimeUnit.SECONDS);
                        

타임아웃 사용하기 

 

 

출처 1 m.blog.naver.com/PostView.nhn?blogId=tmondev&logNo=220945933678&proxyReferer=https:%2F%2Fwww.google.com%2F

출처 2 모던 자바 인 액션 (라울-게이브리얼 우르마, 마리오 푸스코, 앨런 마이크로프트 지음 / 한빛미디어)

 

 

나의 결론, 

우선 나는 AWS에서 Region 별로 API를 호출해야 하고, 각각의 결과를 바탕으로 또 한번 API를 각각 호출해야 한다. 

따라서 단순 계산식이 아니기에 CompleteFuture를 사용하고, ThreadPool크기를 Region숫자로 fix할 예정이다. 

또한 map을 활용하여 비동기 작업과 조합하는 방법으로 개발하겠다. 

 

 

+ Recent posts