엄청나게 큰 데이터를 가공해야 하는데 예를 들어 그 작업을 하루에 1번 정도 해줘야 한다.
엄청나게 큰 데이터를 가공해야 한다 ?
CPU, I/O 등의 자원 처리에 서버 부하가 발생
Request 처리를 올바르게 할 수 없다.
데이터 처리 중에 실패 발생하면, 다시 처음부터 하는게 아니라 끝난 지점부터하는게 좋음 (5만1번째)
하루에 딱 1번 한다 ?
하루에 한번만 돌게 하는 것을 구현하려고 API를 구성하는 것은 낭비일 수 있음
다른 사람이 한번 더 돌리는 실수를 막아야 함
SpringBatch는
'단발성으로 대용량의 데이터를 처리하는 어플리케이션' 이다.
Spring MVC를 사용함으로써 비즈니스 로직에 최대한 집중할 수 있었던 것 처럼 Spring Batch를 이용하면 배치 어플리케이션을 동작하는 데 필요한 로직에만 집중할 수 있다.
예를 들어, 이미 실행된 경우 재실행을 불가능 하게 한다거나, 실패된 지점에서 부터 다시 동작시키게 할 수 있다거나... 이런 기능들을 SpringBatch가 쉽게 할 수 있도록 도와준다.
Batch Application의 조건
대용량 데이터 - 대용량의 데이터를 가져오거나 전달, 계산 등의 처리를 함
자동화 - 사용자 개입을 최소로 줄여 최대한 자동으로 동작될 수 있도록 한다.
견고성 - 잘못된 데이터를 충돌/중단 없이 처리
신뢰성 - 잘못된 부분을 추적할 수 있음 (로깅 / 알림)
성능 - 지정된 시간 내에 처리, 다른 어플리케이션을 방해하지 않는다.
아래의 그림과는 표현한 방식이 살짝 다르다.Job과 Step은 작업의 최소 단위, Step을 통해 데이터 읽기/가공/결과기록 등의 로직을 묶어서 관리한다. Chunk는 한 번의 operation을 통해 다룰 데이터의 집합으로 각 Step은 설정에 정의된 Chunk 단위에 따라 데이터를 읽어 들이고 가공한 후 기록한다. Step의 설정에서 Chunk의 크기를 결정하는데 이는 데이터베이스의 한 row일 수도 있고, CSV 파일의 한 줄일 수도 있다. Chunk는 트랜잭션 관리 단위이기도 하여 마지막 처리 시점에 TX를 커밋하거나 롤백한다. ItemReader<Input> 은 작업 대상이 될 데이터를 읽어 들이는 컴포넌트이다. ItemProcessor<Input,Output>은 로직에 따라 가공하고, 변경된 데이터를 리턴한다. ItemWriter<Output>은 완성된 데이터를 정해진 곳에 저장/색인한다.
Batch Application 생성 - Simple ver.
Spring Initializer를 이용해 위의 세팅으로 프로젝트 생성
SpringBatch의 여러 기능을 쓰려면 필수로 추가해야하는 어노테이션Job 생성과 Step 생성
run을 해보면..
log.info(">>>>> This is Step1") 가 잘 수행되어있음
Batch Application 생성 - hard ver.
PostgreSQL 환경에서 Spring Batch 실행해보기
먼저, Spring Batch를 작동하기 위해 필요한 메타 데이터 테이블을 세팅한다.
Spring Batch 의 메타데이터는..
이전에 실행한 Job에 대한 기록
실패한 Batch Parameter 기록, 성공한 Job 기록
재실행시 시작해야 될 포인트 기록
Job이 갖고 있는 Step에 대한 Histroy tracking (성공/실패 Step 기록)
AWS STS를 사용하면 AWS 리소스에 대한 액세스를 제어할 수 있는 임시 보안 자격 증명을 생성하여 특정 사용자에게 제공할 수 있다.
단기적인 자격으로 애플리케이션에 장기 AWS 보안 자격 증명을 배포 또는 포함할 필요가 없다.
사용자 AWS자격 증명을 제공하지 않아도 AWS 리소스에 대한 액세스 권한을 사용자에게 제공할 수 있다.
수명이 제한되어 있어 만료된 경우 새로 요청해야 한다.
AssumeRole
엑세스 권한이 없는 AWS 리소스에 액세스 할 수 있도록 허용하는 데 유용
다른 AWS 계정의 리소스에 액세스해야 할 수 있다.
호출할 때 필요한 정보
ARN (리소스 정보)
기간 (임시 보안 자격 증명의 기간)
세션 이름
역할을 만들어 IAM 사용자에게 권한 위임
IAM 역할을 사용해 권한을 줄 계정과 권한을 받을 계정 간에 관계 설정이 가능
신뢰받는 계정은 리소스에 대한 엑세스가 필요한 사용자를 저장
권한을 받은 계정의 애플리케이션은 AWS STS의 AssumeRole API 작업을 사용할 수 있음
권한을 받을 계정이 내가 제어하지 않는 AWS 계정인 경우 externalId 속성을 사용해야 한다.
예를 들어 Example Corp이라는 타사를 고용해 AWS 계정을 모니터링하고 비용을 최적화하기로 했다고 가정해봅시다. 일일 경비를 추적하기 위해 Example Corp은 AWS 리소스에 접근해야 합니다. Example Corp 역시 다른 고객을 위해 다른 많은 AWS 계정을 모니터링합니다.
IAM 사용자 및 AWS 계정의 장기 자격 증명에 대한 액세스 권한을 Example Corp에게 제공하지 마십시오. 대신 IAM 역할과 임시 보안 자격 증명을 사용합니다. IAM 역할은 장기 자격 증명(예: IAM 사용자의 액세스 키)을 공유하지 않고도 AWS 리소스에 액세스할 수 있도록 허용하는 메커니즘을 타사에게 제공합니다.
IAM 역할을 사용하여 AWS 계정과 Example Corp 계정 사이에 신뢰 받는 관계를 설정할 수 있습니다. 이 관계가 설정된 후 Example Corp 계정의 멤버는 AWS STSAssumeRoleAPI를 호출하여 임시 보안 자격 증명을 얻을 수 있습니다. Example Corp 멤버는 자격 증명을 사용하여 계정의 AWS 리소스에 액세스할 수 있습니다.
Example Corp을 고용해 고유한 사용자 지정 식별자를 생성하도록 합니다. 이 고유 고객 ID와 AWS 계정 번호를 제공합니다. 이 정보는 다음 단계에서 IAM 역할을 생성하는 데 필요합니다.
참고
이 식별자가 Example Corp의 각 고객에게 고유한 것이라면 Example Corp는 ExternalId에 대해 그들이 원하는 어떤 문자열 값이라도 사용할 수 있습니다. 두 고객이 같은 값을 갖지 않는 한, 고객 계정 번호 또는 임의 문자열이 될 수 있습니다. 이는 '보안 유지'를 위한 것은 아닙니다. Example Corp은 각 고객에게 ExternalId 값을 제공해야 합니다. 가장 중요한 것은 그들의 고객이아닌Example Corp이 그것을 생성해야 한다는 것입니다.
AWS에 로그인해 Example Corp에 리소스에 대한 액세스 권한을 부여하는 IAM 역할을 생성합니다. IAM 역할과 마찬가지로 해당 역할에도 권한 정책과 신뢰 정책이라는 2가지 정책이 있습니다. 그 역할의 신뢰 정책은 역할을 위임할 사용자를 지정합니다. 이 예시 시나리오에서 정책은 Example Corp의 AWS 계정 번호를Principal로 지정합니다. 이렇게 하면 계정의 자격 증명이 그 역할을 수임하도록 허용합니다. 또한,Condition요소를 신뢰 정책에 추가합니다. 이Condition은 Example Corp의 고유 고객 ID와 일치하는지 확인하기 위해ExternalId컨텍스트 키를 테스트합니다. 예를 들면 다음과 같습니다.
"Principal": {"AWS": "Example Corp's AWS Account ID"}, "Condition": {"StringEquals": {"sts:ExternalId": "Unique ID Assigned by Example Corp"}}
역할에 대한 권한 정책은 해당 역할이 누군가가 수행하도록 허용할 수 있는 작업을 지정합니다. 예를 들어 그 역할은 누군가에게 IAM 사용자나 그룹이 아닌 Amazon EC2 또는 Amazon RDS 리소스만을 관리할 수 있게 허용하도록 지정할 수 있습니다. 이 예시 시나리오에서는 권한 정책을 사용하여 Example Corp에게 계정의 리소스 전체에 대한 읽기 전용 액세스 권한을 부여합니다.
역할을 정의한 후에는 역할의 Amazon 리소스 이름(ARN)을 Example Corp에 제공합니다.
Example Corp이 AWS 리소스에 액세스해야 할 때는 그 회사의 누군가가 AWSsts:AssumeRoleAPI를 호출합니다. 이 호출에는 수임할 역할의 ARN과 사용자 지정 ID에 해당하는 ExternalId 파라미터가 포함되어 있습니다.
ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다. compute 메서드는 병렬로 실행할 수 있을 만큼 태스크의 크기가 충분히 작아졌는지 확인하고 크면 두 개의 새로운 ForkJoinSumCalculator로 할당, 이 과정이 반복되어 분할을 반복하고 서브 태스크들이 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문하여 부분 결과를 합쳐 최종 결과를 계산한다.
각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스티림
모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
병렬스트림에서 사용하는 스레드 풀 설정
병렬 스트림은 내부적으로 ForkJoinPool을 사용
ForkJoinPool
서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService인터페이스를 구현한다.
쓰레드풀 서비스의 일종으로 fork를 통해 task를 분담하고 join을 통해 합침
기본적으로 ExecutorService의 구현체지만 다른 점은 thread들이 개별 큐를 가지며 자신에게 아무런 task가 없으면 다른 Thread의 task를 가져와 처리하여 CPU 자원이 늘지 않고 최적의 성능을 낼 수 있다.
프로세서수가 반환하는 값에 상응하는 스레드를 가진다.(기기의프로세서수)
Runtime.getRuntime().availableProcessors();
언제 사용하나?
전체 사이즈를 아는 array, arrayList와 같은 경우 (분할이 잘 이루어지는 데이터 구조)
병렬로 처리되는 작업이 독립적인 경우 (stream의 중간 단계 연산 중 sorted() 혹은 distinct() 사용 XX)
공유된 가변 상태 피해야 함
멀티코어 간의 데이토 이동은 비싸기에 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만
스트림을 재귀적으로 분할하고, 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합치는 것보다도 더 시간이 오래 걸리는 작업!
limit이나 findFirst처럼 요소의 순서에 의존하는 연산이 없을 때
findFirst대신 findAny는 괜찮다.
병합과정 (Collector나 Combilner) 비용이 비싸면 안됨 (성능측정해보기)
주의해야 할 점
공유된 thread pool을 사용하기 때문에 성능장애를 만들 수 있음
fork-join model
Future (자바5)
Future과 CompletableFuture의 관계는 Collection과 Stream의 관계에 비유할 수 있는데, 자바 8에서 새로 제공하는 CompletableFuture클래스는 Future의 기능을 선언형으로 이용할 수 있도록 한 것이다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
return 시간이 오래걸리는 작업();
}
});
doSomthingElse();
try {
Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
//계산 중 예외 발생
} catch (InterruptedException ie) {
//현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te) {
//Future완성되기 전에 타임아웃 발생
}
CompletableFuture - 안정적 비동기 프로그래밍
언제 사용하나 ?
페이스북의 데이터를 기다리는 동안 트위터 데이터를 처리하고 싶다!
병렬성이 아니라 동시성을 필요로 하는 상황 (조금식 연관된 작업을 같은 CPU에서 동작하는 것)
동시성은 프로그래밍 속성이고 병렬성은 하드웨어 수준에서 지원
병렬 스트림과의 비교
병렬 스트림 버전에서는 검색해야할 대상이 스레드 수 보다 많아지는 경우 실행중인 스레드가 종료되길 기다려야 한다.
CompletableFuture는 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다.
스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
병렬 스트림과 CompletableFuture 둘 중 어떤 것을 사용해야 할까?
I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 병렬 스트림
I/O를 기다리는 작업을 병렬로 실행하고, 스레드 수를 적절하게 설정해야 하는 경우
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = 오래 걸리는 작업 메소드();
futurePrice.complete();
} catch (Exception ex) {
futurePrice.completeExceptionally(ex); // Future에 에러 포함해서 종료
}
}).start();
return futurePrice;
}
Future 사용 Example
Future<Double> futurePrice = shop.getPriceAsync("dd");
다른 작업 메소드 (); // 가격 가져오게 하고 다른 작업 하기
try {
double price = futurePrice.get();
} catch (Exception e) {
throw RuntimeException(e);
}
비동기 메소드 구현 및 사용
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> 오래 걸리는 작업 메소드(product));
}
CompletableFuture를 직접 만들지 않고 간단하게 만드는 방법
private final Executor executor =
Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
});
내가 맡은 부분은 서버 즉 백엔드 부분이므로, 설정한 언어에 맞게 Error Message를 클라이언트로 전달해야 했다.
기존 구 어드민 사이트에서는 .properties 파일에 Message들을 저장했지만 나는 DB에 저장하여 관리키로 함.
전략이라 할 것도 없지만.. 어쨌든 내가 가고자 하는 방향에 맞는 전략은 아래와 같다.
(내 전략의 방향이 맞는지, 그리고 이것을 구현한 방법이 맞는지 아직도 의문이다. 따라서 혹시라도 이것을 보시고 더 좋은 방법이나 틀린 부분이 있다고 느끼시는 분들이 있다면 Comment로 남겨주시면 제가 그것을 감사히 읽어보고 수정하도록 하겠습니다.)
- 에러 메시지는 DB에 저장
- 에러 메시지를 클라이언트로 전달할 때는 Caching되어진 데이터를 전달하고, 데이터는 locale과 messageKey 값으로 찾음
- Exception은 @RestControllerAdvice로 지정한 class에서 처리
@RestContorllerAdvice로 설정된 전역 ExceptionHandler가 클라이언트에서 전달한 Accept-Language Header값에 맞는 Exception Message로 전달하는 역할을 담당한다. 각 메소드는 Exception에 대한 MessageKey를 가지고 있어 Caching되어진 Exception Data에서 MessageKey와 Locale 정보에 맞는 Exception Message를 불러와서 Client로 전달한다. 단, 어떤 Exception 은 구체화된 에러 메시지를 내려줄 필요성이 있기에 이런 경우에는 exception을 throw할 때 overriding한 CustomException의 messageKey를 set해서 전달한다. 아래 로직은 @ExceptionHandler가 Exception을 Client에 전달하기 전에 수행하는 작업에 대한 내용이다.
@RestControllerAdvice
@Slf4j
public class AdminExceptionHandler {
@ExceptionHandler(ForbiddenRequestException.class)
public ResponseEntity<?> handleForbiddenRequestException(ForbiddenRequestException e, Locale locale) {
log.error(ExceptionUtils.getExceptionMessage(e));
String messageKey;
if (e.getMsgKey() != null) {
messageKey = e.getMsgKey();
} else {
messageKey = "forbidden.request";
}
return ApiMessage.builder().errorMessage(resource.getMessage(CacheKeys.builder().messageKey(messageKey).locale(locale.toString()).build())).status(e.getStatus()).build().toEntity();
}
}
그리고 아래 부분은 DefaultLocale 값을 설정하고, Accept-Language 값을 Locale로 설정하기 위해 구현한 내용이다.
나는 CookieLocaleResolver를 사용했다. 아래 설정을 마치면 @ExceptionHandler로 지정한 메소드로 전달한 Locale값이 Accept-Language 헤더값으로 잘 전달된다.