모던자바 데이터 처리와 성능 향상 (JVM메모리/Thread/Stream/병렬스트림/CompletableFuture)
현재 진행하고 있는 프로젝트에서 데이터 처리하는 데 시간이 많이 소요되는 부분이 있어
이것을 개선하고자 관련해서 공부를 해보려 한다.
우선 내가 처한 상황은,
AWS ComputeOptimizer 서비스 API를 사용하려는데 모든 Region에 대한 데이터가 필요함.
그런데 각 Region별로 Client를 생성해서 API 호출 및 데이터 변환에 많은 시간이 소요.
따라서, 해당 부분의 성능을 향상시켜 페이지 로딩시 데이터를 최대한 빨리 가져오도록 하려 한다.
먼저 기초부터 알고 가자!
- JVM
- 자바 애플리케이션을 클래스 로더를 통해 읽어 들여 자바 API와 함께 실행하는 역할
- JAVA와 OS 사이에서 중개자 역할을 수행하여, OS에 구애받지 않고 사용할 수 있게 해준다.
- 메모리관리, GarbageCollection을 수행한다.
- 스택기반의 가상머신
- Java Program 실행과정
- 프로그램이 실행되면 JVM이 OS로부터 메모리를 할당받음
- JVM은 이 메모리를 용도에 따라 여러 영역으로 나누어 관리 (아래 참고)
- 자바 컴파일러가 자바 소스코드를 읽어들여 바이트코드(.class)로 변환
- Class Loader를 통해 class 파일들을 JVM으로 로딩
- Execution engine을 통해 class파일 해석
- 해석된 바이트코드는 Runtime Data Areas에 배치되고 실행된다.
- JVM은 필요에 따라 GC 와 같은 관리 작업을 수행한다.
- 프로그램이 실행되면 JVM이 OS로부터 메모리를 할당받음
출처 : asfirstalways.tistory.com/158
- JVM 메모리 구조(RuntimeDataAreas)
- Thread별로 생성되는 데이터 영역
- pc register
- Thread가 어떤 부분을 어떤 명령으로 실행해야 할 지 기록한다.
- stack
- 로컬 변수, 일부 실행 결과, 메소드 호출 또는 반환 등을 저장
- 메소드 호출 시마다 각각의 스택 프레임이 생성되고, 메소드 수행이 끝나면 삭제됨
- 메소드 안에서 사용되는 값을 저장
- native method stack
- pc register
- 전체 Thread가 공유하는 데이터 영역
- heap
- 객체를 저장하는 영역, 할당된 객체 해제는 Garbage Collector에 의해서만 가능
- Permanent Generation
- 객체 주소값 저장
- New/Young
- Eden : 객체 최초 생성 공간
- Survivor 0/1 : Eden에서 참조 되는 객체들이 저장되는 공간
- Old
- New area에서 일정 시간 참조되고 있는 객체들이 저장되는 공간
- method area
- 클래스 정보를 처음 메모리 공간에 올릴 때 초기화되는 대상을 저장하기 위한 메모리 공간
- 클래스의 필드, 메소드 정보, static 변수, 메소드와 생성자의 바이트 코드, 클래스, 인터페이스와 관련된 Runtime Constant Pool이 저장되며, Garbage Collector 대상이 아님
- heap
- Thread별로 생성되는 데이터 영역
출처 : hongsii.github.io/2018/12/20/jvm-memory-structure/
- Thread
- 프로세스 내부에 존재하는 프로세스보다 더 작은 작업의 단위
- 한 프로세스 내에서 데이터를 병렬적으로 처리하고 각 쓰레드의 결과를 합쳐서 사용하게 되면 데이터 처리를 하는 데 시간을 절약할 수 있다.
- 생성방법
- Runnable 인터페이스 사용
- run() 메소드를 구현하여 Thread 사용할 수 있음
- Thread 클래스 사용 (Runnable 클래스 구현한 것)
- Runnable 인터페이스 사용
- 주의해야 할 점
- 운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 예상치 못한 방식으로 크래시될 수 있으므로 기존스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나지 않도록 주의해야 한다.
- ThreadPool
- 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다.
- 자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공
- newFixedThreadPool 메서드를 이용해 스레드 풀을 만든다.
- 이 메서드는 워커 스레드라 불리는 ExcecutorService를 만들고 이들은 스레드 풀에 저장된다.
- 스레드풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행하고, 실행이 종료되면 이들 스레드는 풀로 반환된다.
- 스레드 풀을 사용할 때 주의해야 할 점
- 잠을 자거나 I/O를 기다리거나 네트워크 연결을 기다리는 테스크가 있다면 주의하기
- 처음 제출한 태스크가 기존 실행 중인 태스크가 나중의 테스크 제출을 기다리는 상황 피하기 (데드락)
- 프로그램을 종료하기 전에 모든 스레드 풀을 종료해야 함
- Multi Thread
- Stack 영역을 스레드 갯수 만큼 분할한다.
- 하나의 프로그램에서 여러 개의 실행 흐름을 만들고 실행하게 한다.
이제 본격적으로 자바 8 API에 추가된 Stream부터 알아보도록 하자.
- Stream
- Stream vs Collection
- 데이터 계산 방법
- 현재 자료구조가 포함하는 모든 값을 메모리에 저장하는 컬렉션과 달리 스트림은 이론적으로 요청할 때만 요소를 계산한다.
- 사용자가 요청하는 값만 스트림에서 추출한다는 것이 핵심
- 데이터 반복 처리 방법
- 컬렉션은 사용자가 직접 요소를 반복하는 외부 반복
- "미영아 상자안에 뭐 있어?" "인형" "인형 가져올래? 그리도 뭐있어?" "과자" "과자 가져올래? 그리고 뭐있어?" "양말" "양말 가져올래 그리고 뭐있어?" "이제 없어" "그럼 됐어"
- 스트림 라이브러리는 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장해주는 내부반복
- "미영아 상자안에 있는 거 다가져와"
- "미영아 상자안에 있는 거 다가져와"
- 컬렉션은 사용자가 직접 요소를 반복하는 외부 반복
- 데이터 계산 방법
- Stream vs Collection
- 병렬스트림 (parallelStream)
- ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다. compute 메서드는 병렬로 실행할 수 있을 만큼 태스크의 크기가 충분히 작아졌는지 확인하고 크면 두 개의 새로운 ForkJoinSumCalculator로 할당, 이 과정이 반복되어 분할을 반복하고 서브 태스크들이 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문하여 부분 결과를 합쳐 최종 결과를 계산한다.
- 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스티림
- 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
- 병렬스트림에서 사용하는 스레드 풀 설정
- 병렬 스트림은 내부적으로 ForkJoinPool을 사용
- ForkJoinPool
- 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService인터페이스를 구현한다.
- 쓰레드풀 서비스의 일종으로 fork를 통해 task를 분담하고 join을 통해 합침
- 기본적으로 ExecutorService의 구현체지만 다른 점은 thread들이 개별 큐를 가지며 자신에게 아무런 task가 없으면 다른 Thread의 task를 가져와 처리하여 CPU 자원이 늘지 않고 최적의 성능을 낼 수 있다.
- ForkJoinPool
- 프로세서수가 반환하는 값에 상응하는 스레드를 가진다.(기기의프로세서수)
- Runtime.getRuntime().availableProcessors();
- 병렬 스트림은 내부적으로 ForkJoinPool을 사용
- 언제 사용하나?
- 전체 사이즈를 아는 array, arrayList와 같은 경우 (분할이 잘 이루어지는 데이터 구조)
- 병렬로 처리되는 작업이 독립적인 경우 (stream의 중간 단계 연산 중 sorted() 혹은 distinct() 사용 XX)
- 공유된 가변 상태 피해야 함
- 멀티코어 간의 데이토 이동은 비싸기에 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만
- 스트림을 재귀적으로 분할하고, 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합치는 것보다도 더 시간이 오래 걸리는 작업!
- limit이나 findFirst처럼 요소의 순서에 의존하는 연산이 없을 때
- findFirst대신 findAny는 괜찮다.
- 병합과정 (Collector나 Combilner) 비용이 비싸면 안됨 (성능측정해보기)
- 주의해야 할 점
- 공유된 thread pool을 사용하기 때문에 성능장애를 만들 수 있음
- Future (자바5)
- Future과 CompletableFuture의 관계는 Collection과 Stream의 관계에 비유할 수 있는데, 자바 8에서 새로 제공하는 CompletableFuture클래스는 Future의 기능을 선언형으로 이용할 수 있도록 한 것이다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
return 시간이 오래걸리는 작업();
}
});
doSomthingElse();
try {
Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
//계산 중 예외 발생
} catch (InterruptedException ie) {
//현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te) {
//Future완성되기 전에 타임아웃 발생
}
- CompletableFuture - 안정적 비동기 프로그래밍
- 언제 사용하나 ?
- 페이스북의 데이터를 기다리는 동안 트위터 데이터를 처리하고 싶다!
- 병렬성이 아니라 동시성을 필요로 하는 상황 (조금식 연관된 작업을 같은 CPU에서 동작하는 것)
- 동시성은 프로그래밍 속성이고 병렬성은 하드웨어 수준에서 지원
- 병렬 스트림과의 비교
- 병렬 스트림 버전에서는 검색해야할 대상이 스레드 수 보다 많아지는 경우 실행중인 스레드가 종료되길 기다려야 한다.
- CompletableFuture는 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다.
- 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
- 병렬 스트림과 CompletableFuture 둘 중 어떤 것을 사용해야 할까?
- I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 병렬 스트림
- I/O를 기다리는 작업을 병렬로 실행하고, 스레드 수를 적절하게 설정해야 하는 경우
- 언제 사용하나 ?
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = 오래 걸리는 작업 메소드();
futurePrice.complete();
} catch (Exception ex) {
futurePrice.completeExceptionally(ex); // Future에 에러 포함해서 종료
}
}).start();
return futurePrice;
}
Future 사용 Example
Future<Double> futurePrice = shop.getPriceAsync("dd");
다른 작업 메소드 (); // 가격 가져오게 하고 다른 작업 하기
try {
double price = futurePrice.get();
} catch (Exception e) {
throw RuntimeException(e);
}
비동기 메소드 구현 및 사용
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> 오래 걸리는 작업 메소드(product));
}
CompletableFuture를 직접 만들지 않고 간단하게 만드는 방법
private final Executor executor =
Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
});
애플리케이션에 맞는 커스텀 Executor
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shop.stream()
.map(shop -> CompletableFuture.supplayAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(()-> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}
동기 작업과 비동기 작업 조합하기
Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> 메소드())
.completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
(price, rate) -> price * rate ))
.orTimeout(3, TimeUnit.SECONDS);
타임아웃 사용하기
출처 2 모던 자바 인 액션 (라울-게이브리얼 우르마, 마리오 푸스코, 앨런 마이크로프트 지음 / 한빛미디어)
나의 결론,
우선 나는 AWS에서 Region 별로 API를 호출해야 하고, 각각의 결과를 바탕으로 또 한번 API를 각각 호출해야 한다.
따라서 단순 계산식이 아니기에 CompleteFuture를 사용하고, ThreadPool크기를 Region숫자로 fix할 예정이다.
또한 map을 활용하여 비동기 작업과 조합하는 방법으로 개발하겠다.