현재 진행하고 있는 프로젝트에서 데이터 처리하는 데 시간이 많이 소요되는 부분이 있어 

이것을 개선하고자 관련해서 공부를 해보려 한다. 

 

우선 내가 처한 상황은,

AWS ComputeOptimizer 서비스 API를 사용하려는데 모든 Region에 대한 데이터가 필요함.

그런데 각 Region별로 Client를 생성해서 API 호출 및 데이터 변환에 많은 시간이 소요.

따라서, 해당 부분의 성능을 향상시켜 페이지 로딩시 데이터를 최대한 빨리 가져오도록 하려 한다. 

 

 

먼저 기초부터 알고 가자!

  • JVM
    • 자바 애플리케이션을 클래스 로더를 통해 읽어 들여 자바 API와 함께 실행하는 역할 
    • JAVA와 OS 사이에서 중개자 역할을 수행하여, OS에 구애받지 않고 사용할 수 있게 해준다.
    • 메모리관리, GarbageCollection을 수행한다. 
    • 스택기반의 가상머신
  • Java Program 실행과정 
    • 프로그램이 실행되면 JVM이 OS로부터 메모리를 할당받음 
      • JVM은 이 메모리를 용도에 따라 여러 영역으로 나누어 관리 (아래 참고)
    • 자바 컴파일러가 자바 소스코드를 읽어들여 바이트코드(.class)로 변환 
    • Class Loader를 통해 class 파일들을 JVM으로 로딩 
    • Execution engine을 통해 class파일 해석
    • 해석된 바이트코드는 Runtime Data Areas에 배치되고 실행된다.
    • JVM은 필요에 따라 GC 와 같은 관리 작업을 수행한다. 

출처 : asfirstalways.tistory.com/158

HEAP

 

  • JVM 메모리 구조(RuntimeDataAreas)
    • Thread별로 생성되는 데이터 영역
      • pc register
        • Thread가 어떤 부분을 어떤 명령으로 실행해야 할 지 기록한다.
      • stack
        • 로컬 변수, 일부 실행 결과, 메소드 호출 또는 반환 등을 저장
        • 메소드 호출 시마다 각각의 스택 프레임이 생성되고, 메소드 수행이 끝나면 삭제됨
        • 메소드 안에서 사용되는 값을 저장 
      • native method stack
    • 전체 Thread가 공유하는 데이터 영역
      • heap
        • 객체를 저장하는 영역, 할당된 객체 해제는 Garbage Collector에 의해서만 가능 
        • Permanent Generation
          • 객체 주소값 저장
        • New/Young
          • Eden : 객체 최초 생성 공간
          • Survivor 0/1 : Eden에서 참조 되는 객체들이 저장되는 공간
        • Old
          • New area에서 일정 시간 참조되고 있는 객체들이 저장되는 공간
      • method area
        • 클래스 정보를 처음 메모리 공간에 올릴 때 초기화되는 대상을 저장하기 위한 메모리 공간
        • 클래스의 필드, 메소드 정보, static 변수, 메소드와 생성자의 바이트 코드, 클래스, 인터페이스와 관련된 Runtime Constant Pool이 저장되며, Garbage Collector 대상이 아님

출처 : hongsii.github.io/2018/12/20/jvm-memory-structure/

 

 

  • Thread
    • 프로세스 내부에 존재하는 프로세스보다 더 작은 작업의 단위
    • 한 프로세스 내에서 데이터를 병렬적으로 처리하고 각 쓰레드의 결과를 합쳐서 사용하게 되면 데이터 처리를 하는 데 시간을 절약할 수 있다. 
    • 생성방법
      • Runnable 인터페이스 사용
        • run() 메소드를 구현하여 Thread 사용할 수 있음
      • Thread 클래스 사용 (Runnable 클래스 구현한 것)
    • 주의해야 할 점
      • 운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 예상치 못한 방식으로 크래시될 수 있으므로 기존스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나지 않도록 주의해야 한다. 
  • ThreadPool
    • 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다. 
    • 자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공
    • newFixedThreadPool 메서드를 이용해 스레드 풀을 만든다. 
      • 이 메서드는 워커 스레드라 불리는 ExcecutorService를 만들고 이들은 스레드 풀에 저장된다. 
    • 스레드풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행하고, 실행이 종료되면 이들 스레드는 풀로 반환된다. 
    • 스레드 풀을 사용할 때 주의해야 할 점
      • 잠을 자거나 I/O를 기다리거나 네트워크 연결을 기다리는 테스크가 있다면 주의하기 
      • 처음 제출한 태스크가 기존 실행 중인 태스크가 나중의 테스크 제출을 기다리는 상황 피하기 (데드락)
      • 프로그램을 종료하기 전에 모든 스레드 풀을 종료해야 함
  • Multi Thread
    • Stack 영역을 스레드 갯수 만큼 분할한다.
    • 하나의 프로그램에서 여러 개의 실행 흐름을 만들고 실행하게 한다.  

 

이제 본격적으로 자바 8 API에 추가된 Stream부터 알아보도록 하자.

  • Stream
    • Stream vs Collection
      • 데이터 계산 방법
        • 현재 자료구조가 포함하는 모든 값을 메모리에 저장하는 컬렉션과 달리 스트림은 이론적으로 요청할 때만 요소를 계산한다.
        • 사용자가 요청하는 값만 스트림에서 추출한다는 것이 핵심
      • 데이터 반복 처리 방법
        • 컬렉션은 사용자가 직접 요소를 반복하는 외부 반복
          • "미영아 상자안에 뭐 있어?" "인형" "인형 가져올래? 그리도 뭐있어?" "과자" "과자 가져올래? 그리고 뭐있어?" "양말" "양말 가져올래 그리고 뭐있어?" "이제 없어" "그럼 됐어"
        • 스트림 라이브러리는 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장해주는 내부반복 
          • "미영아 상자안에 있는 거 다가져와"

  • 병렬스트림 (parallelStream)
    • ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다. compute 메서드는 병렬로 실행할 수 있을 만큼 태스크의 크기가 충분히 작아졌는지 확인하고 크면 두 개의 새로운 ForkJoinSumCalculator로 할당, 이 과정이 반복되어 분할을 반복하고 서브 태스크들이 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문하여 부분 결과를 합쳐 최종 결과를 계산한다.  
    • 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스티림
    • 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다. 
    • 병렬스트림에서 사용하는 스레드 풀 설정
      • 병렬 스트림은 내부적으로 ForkJoinPool을 사용
        • ForkJoinPool
          • 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService인터페이스를 구현한다.  
          • 쓰레드풀 서비스의 일종으로 fork를 통해 task를 분담하고 join을 통해 합침
          • 기본적으로 ExecutorService의 구현체지만 다른 점은 thread들이 개별 큐를 가지며 자신에게 아무런 task가 없으면 다른 Thread의 task를 가져와 처리하여 CPU 자원이 늘지 않고 최적의 성능을 낼 수 있다. 
      • 프로세서수가 반환하는 값에 상응하는 스레드를 가진다.(기기의프로세서수)
        • Runtime.getRuntime().availableProcessors();
    • 언제 사용하나? 
      • 전체 사이즈를 아는 array, arrayList와 같은 경우 (분할이 잘 이루어지는 데이터 구조)
      • 병렬로 처리되는 작업이 독립적인 경우 (stream의 중간 단계 연산 중 sorted() 혹은 distinct() 사용 XX)
        • 공유된 가변 상태 피해야 함
      • 멀티코어 간의 데이토 이동은 비싸기에 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 
        • 스트림을 재귀적으로 분할하고, 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합치는 것보다도 더 시간이 오래 걸리는 작업!
      • limit이나 findFirst처럼 요소의 순서에 의존하는 연산이 없을 때 
        • findFirst대신 findAny는 괜찮다.
      • 병합과정 (Collector나 Combilner) 비용이 비싸면 안됨 (성능측정해보기)
    • 주의해야 할 점 
      • 공유된 thread pool을 사용하기 때문에 성능장애를 만들 수 있음 

fork-join model

 

  • Future (자바5)
    • Future과 CompletableFuture의 관계는 Collection과 Stream의 관계에 비유할 수 있는데, 자바 8에서 새로 제공하는 CompletableFuture클래스는 Future의 기능을 선언형으로 이용할 수 있도록 한 것이다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
	public Double call() {
    		return 시간이 오래걸리는 작업();
    }
});

doSomthingElse();

try {
	Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
	//계산 중 예외 발생
} catch (InterruptedException ie) {
	//현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te) {
	//Future완성되기 전에 타임아웃 발생
}

 

 

  • CompletableFuture - 안정적 비동기 프로그래밍
    • 언제 사용하나 ? 
      • 페이스북의 데이터를 기다리는 동안 트위터 데이터를 처리하고 싶다!
      • 병렬성이 아니라 동시성을 필요로 하는 상황 (조금식 연관된 작업을 같은 CPU에서 동작하는 것)
    • 동시성은 프로그래밍 속성이고 병렬성은 하드웨어 수준에서 지원 
    • 병렬 스트림과의 비교 
      • 병렬 스트림 버전에서는 검색해야할 대상이 스레드 수 보다 많아지는 경우 실행중인 스레드가 종료되길 기다려야 한다. 
      • CompletableFuture는 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다.
        • 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다. 
    • 병렬 스트림과 CompletableFuture  둘 중 어떤 것을 사용해야 할까?
      • I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 병렬 스트림
      • I/O를 기다리는 작업을 병렬로 실행하고, 스레드 수를 적절하게 설정해야 하는 경우 
public Future<Double> getPriceAsync(String product) {
	CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            try {
                double price = 오래 걸리는 작업 메소드();
                futurePrice.complete();
            } catch (Exception ex) {
                futurePrice.completeExceptionally(ex); // Future에 에러 포함해서 종료
            }
        }).start();
        return futurePrice;
}

Future 사용 Example

 

Future<Double> futurePrice = shop.getPriceAsync("dd");

다른 작업 메소드 (); // 가격 가져오게 하고 다른 작업 하기

try {
	double price = futurePrice.get();
} catch (Exception e) {
	throw RuntimeException(e);
}

비동기 메소드 구현 및 사용

 

public Future<Double> getPriceAsync(String product) {
	return CompletableFuture.supplyAsync(() -> 오래 걸리는 작업 메소드(product));
}

CompletableFuture를 직접 만들지 않고 간단하게 만드는 방법 

 

private final Executor executor = 
	Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
   		public Thread newThread(Runnable r){
        	Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
            
        }
    }
});

애플리케이션에 맞는 커스텀 Executor

 

public List<String> findPrices(String product) {
	List<CompletableFuture<String>> priceFutures = 
      shop.stream()
      .map(shop -> CompletableFuture.supplayAsync(() -> shop.getPrice(product), executor))
      .map(future -> future.thenApply(Quote::parse))
      .map(future -> future.thenCompose(quote -> 
      			CompletableFuture.supplyAsync(()-> Discount.applyDiscount(quote), executor)))
      .collect(toList());
      
      
	return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}

동기 작업과 비동기 작업 조합하기

 

Future<Double> futurePriceInUSD = 
	CompletableFuture.supplyAsync(() -> shop.getPrice(product))
    				 .thenCombine(CompletableFuture.supplyAsync(() -> 메소드())
                     	.completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
                    	 (price, rate) -> price * rate ))
                     .orTimeout(3, TimeUnit.SECONDS);
                        

타임아웃 사용하기 

 

 

출처 1 m.blog.naver.com/PostView.nhn?blogId=tmondev&logNo=220945933678&proxyReferer=https:%2F%2Fwww.google.com%2F

출처 2 모던 자바 인 액션 (라울-게이브리얼 우르마, 마리오 푸스코, 앨런 마이크로프트 지음 / 한빛미디어)

 

 

나의 결론, 

우선 나는 AWS에서 Region 별로 API를 호출해야 하고, 각각의 결과를 바탕으로 또 한번 API를 각각 호출해야 한다. 

따라서 단순 계산식이 아니기에 CompleteFuture를 사용하고, ThreadPool크기를 Region숫자로 fix할 예정이다. 

또한 map을 활용하여 비동기 작업과 조합하는 방법으로 개발하겠다. 

 

 

+ Recent posts