자바

모던 자바 - Reactive Programming 리액티브 프로그래밍 - java 9 Flow

모디(modi) 2021. 8. 25. 18:25

신규 프로젝트에 참여하게 됐는데, 기존에 만들어 오던 API 서버와는 다른 부분이 있었다.

바로, 신규 서버는 DB에 연결하지 않고 DB에 연결된 API 서버들과 내부 통신해서 데이터를 가져온다는 점이었다. 

 

Frontend 에서 Request를 보내면 2번 서버가 받는데 2번 서버는 아래 케이스의 일을 수행한다.

 

case

front에서 2번 서버로 request -> 2번은 4번 서버에 api 콜하여 인증 확인을 한다 -> 1번 서버에 api 콜을 해 PostgreSQL 데이터를 2번 서버로 가져와 데이터를 가공(선택) -> 가공한 데이터를 기반으로 3번 서버에 api 호출을 N번 한다. -> N번 호출해서 받은 데이터를 2번에서 가공 -> 완성된 형태의 response를 frontend로 전달 

 

 

1번 서버 - RDS 디비와 연결 / 4번 서버 - 인증 서버로 RDS 디비와 연결 / 3번 서버 - Elasticsearch 검색엔진으로부터 데이터를 가져옴

 

나는 2번 서버 개발을 맡게 되었는데 3번 서버로 날린 여러 API 호출 결과들을 빠르게 조합해서 Front로 전달해야 했기 때문에 비동기로 해당 작업들을 수행하는 것이 좋겠다고 판단하였다. 

 

기존에는 RestTemplate과 모던 자바의 CompletableFuture 조합으로 비동기를 구현했었지만, 이번 프로젝트에는 새로운 기술을 익혀보고 싶어 WebFlux 를 이용해 보기로 했다. 그런데 WebFlux가 어떤건지 대충은 알았지만 정확하게 무엇인지, 구현하는 방법도 모른다는 문제점이.....

 

이제부터 리액트 프로그래밍에 대해 알아보고, 이것을 신규 프로젝트에 적용할 수 있는지, 할 수 있다면 어떻게 할 수 있는지 알아보도록 한다.

 

어려움이 닥칠 때 마다 찾는 '모던 자바 인 액션'

다시 이 책을 펼쳤고, 먼저 리액티브 프로그래밍에 대해 자세히 공부해보는 것이 좋겠다고 생각하여 이렇게 포스팅을 하게 되었다. WebFlux를 도입하기 전 "Reactor"에 대한 이해가 선행되어야 한다 해서 "Reactor"에 대해 먼저 알아봤다.

 

결론은 신규 프로젝트에 적용을 하진 못했지만, 관련해서 알아봤던 내용들을 정리해보겠다.

 

 

Reactive Programming

리액티브 시스템과 리액티브 프로그래밍은 다른 개념으로 리액티브 시스템은 시스템 레벨에서 아키텍트와 DevOps를 위한 생산성을 제공한다. 리액티브 프로그래밍은 리액티브 시스템의 구현 수준의 하위집합으로, 내부 로직 및 데이터 흐름 관리를 위한 구성요소 단계에서 높은 생산성을 제공한다. 

  • 정의
    • 데이터 흐름과 전달에 관한 프로그래밍 패러다임
    • 리액티브 스트림을 사용하는 프로그래밍
      • 리액티브 스트림 ? 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술
      • 역압력 (Back Pressure) ? 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자가 이벤트를 처리하는 속도보다 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치
  • 왜 이런 패러다임이 생겼나?
    • 적은 수의 스레드로 동시성을 처리하고 더 적은 하드웨어 리소스로 확장할 수 있는 비차단 웹 스택이 필요하다는 것
      • Servlet 3.1에서 non-blocking I/O를 위한 API를 제공했지만 이 API의 사용은 synchronous 이거나 blocking인 ServletAPI와 멀어지는 개념이었다.
    • Java 8에 람다 표현식이 추가됨
      • Java 8에 람다 표현식을 추가되면서 Fuctional Programming이 가능하게 되었고, 이것은 비동기 논리의 선언적 구성을 허용하는 non-blocking 애플리케이션에 대한 이점입니다.
    • 빅데이터/모바일부터 클라우드 기반 클러스터에 이르는 다양한 환경/밀리초 단위의 응답시간을 기대하는 사용패턴에서의 요구사항을 만족시켜 주기 위해
    • 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리해서 문제를 해결한다. 

 

  • Reactive의 속성 4가지 Keyword (핵심원칙)
    • 반응성: 일정하고 예상할 수 있는 빠른 반응시간
    • 회복성: 장애가 전파되지 않고 복구된다.
    • 탄력성: 작업량의 변화와 무관하다. (병목현상) -> 작업 부하 발생시 관련 컴포넌트에 할당된 자원 수 늘린다.
    • 메시지기반 : 컴포넌트 간의 약한 결합, 고립, 위치 투명성이 유지되도록 시스템은 비동기 메시지 전달에 의존
  • JDK에서 리액티브 프로그래밍을 제공하는 기술 
    • RxJava
    • Project Reactor
    • Spring Framework 5.0
  • RxJava
    • 자바로 리액티브 프로그래밍을 할 수 있는 라이브러리
      • 비동기 프로그래밍과 함수형 프로그래밍 기법을 활용한다
    • 복잡한 비동기 프로그램을 쉽게 개발할 수 있게 해준다. 

 

java.util.concurrent.Flow 클래스 

 

  • 자바 9에서는 리액티브 프로그래밍을 제공하기 위해 Flow클래스를 추가했다. 
    • 리액티브 스트림 프로젝트의 표준에 따라 발행-구독 모델을 지원
    • Akka, RxJava등의 리액티르 라이브러리는 Flow 클래스에 정의된 인터페이스를 구현한다. 
  • Flow클래스의 인터페이스
    • Publisher : 항목 발행
    • Subscriber : Publisher가 발행한 항복을 한개 또는 여러개 소비
    • Subscription : 위 소비 과정을 정적 메서드로 관리 / Publisher와 Subscriber 사이의 제어 흐름, 역압력 관리 
    • Processor : 프로세서는 게시자와 구독자 사이에 있는 구성 요소로써 Publisher에 시그널을 요청하거나 아이템을 Subscriber에게 Push함

https://ozenero.com/java-9-flow-api-example-processor

public interface Subscriber<T> {
    void onSubscribe(Subscription s); // 항상 처음 호출됨
    void onNext(T t); // 2번째로 호출되는 데 여러 번 호출될 수 있음
    void onComplete(); // 더 이상의 데이터가 없고 종료됨을 알림
    void onError(Throwable t); // Publisher에 장애가 발생했을 때 호출함
}

Subscriber 인터페이스는 Publisher가 관련 이벤트를 발행할 때 호출할 수 있도록 콜백 메서드 네개를 정의한다. 

Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달한다. Subscription 인터페이스는 메서드 두 개를 정의한다. Subscription은 첫 번째 메서드로 Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알릴 수 있다. 두 번째 메서드로는 Subscription을 취소, 즉 Publisher에게 더 이상 이벤트를 받지 않음을 통지한다.

 

public interface Subscription { 
    void request(long n);
    void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

플로 API를 사용하는 리액티브 애플리케이션의 생명주기

 

https://ozenero.com/java-9-flow-api-example-processor

  • 게시자는 프로세서와 함께 작동하도록 구독을 정의
  • 프로세서는 구독자와 함께 작동하도록 자체 구독을 정의
  • Subscriber::onNext() 메서드를 사용하여 게시자는 항목을 프로세서에 푸시하고 프로세서는 항목을 구독자에 푸시
  • Subscription::request() 메서드를 사용하여 프로세서는 게시자에게 항목을 요청, 구독자는 프로세서에 항목을 요청
  • 게시자 및 프로세서는 멀티스레딩을 위한 실행자를 정의하고, request() 및 onNext() 메서드가 비동기적으로 작동
  • Processor는 Subscriber와 Processor가 요청한 항목의 수요량이 다른 경우 항목을 저장하기 위한 데이터 버퍼 가짐

 

자바 9 플로 API를 직접 이용하는 첫 리액티브 애플리케이션 ( from 모던 자바 인 액션)

 

- TempInfo. 원격온도계로 0에서 99사이 온도를 보고

- TempSubscriber 각 도시에 설치된 센서가 보고한 온도 스트림 출력

 

import java.util.Random;

public class TempInfo {

    public static final Random random = new Random();
    
    private final String town;
    private final int temp;
    
    public TempInfo(String town, int temp) {
    	this.town = town;
        this.temp = temp;
    }
    
    public static TempInfo fetch(String town) {
    	if (random.nextInt(10) == 0) { // 10분의 1 확률로 작업 실패
        	throw new RuntimeException("Error!");
        }
        return new TempInfo(town, random.nextInt(100)); // 임의의 화씨 온도를 반환
    }
    
    @Override
    public String toString() {
    	return town + " : " + temp;
    }
    
    public int getTemp() { 
    	return temp;
    }
    
    public String getTown() {
    	return town;
    }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TempSubscription implements Subscription {

    private final Subscriber<? super TempInfo> subscriber;
    private final String town;
    private static final ExecutorService executor = Executors.newSingleThreadExecutor();

    public TempSubscription (Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit( () -> {
            for (long l = 0L; l < n; l++) {
                try {
                    subscriber.onNext(TempInfo.fetch(town)); // 현재 온도를 subscriber로 전달
                } catch (Exception e) {
                    subscriber.onError(e); // 실패하면 subscriber로 에러 전달
                    break;
                }
            }
        });
    }

    @Override
    public void cancel() {
        subscriber.onComplete(); // 구독취소되면 subscriber로 전달
    }
}

ExecutorService는 비동기 모드에서 실행 중인 작업을 단순화하는 JDK API

일반적으로 ExecutorService는 스레드 풀과 여기에 태스크를 할당하기 위한 API를 자동으로 제공합니다.

 

 

public class TempSubscriber implements Flow.Subscriber<TempInfo> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        System.out.println(tempInfo);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        System.err.println(t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

 

Processor는 Subscriber이며 동시에 Publisher다.

Processor의 목적은 Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것이다. 

 

화씨를 섭시로 변환하는 Processor

import java.util.concurrent.Flow.*;

public class TempProcessor implements Processor<TempInfo, TempInfo> {

    private Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Subscriber<? super TempInfo> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onNext(TempInfo temp) {
        subscriber.onNext(new TempInfo(temp.getTown(),
                (temp.getTemp() - 32) * 5 / 9));
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }

Publisher의 subscribe 메서드는 업스트림 Subscriber를 Processor로 등록하는 동작을 수행한다. 

 

import java.util.concurrent.Flow.Publisher;

public class Main {
    public static void main(String[] args) {
        getCelsiusTemperatures("New York").subscribe(new TempSubscriber());
    }

    private static Publisher<TempInfo> getCelsiusTemperatures(String town) {
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe(subscriber);
            processor.onSubscribe(new TempSubscription(processor, town));
        };
    }
}

 

로그를 찍어보며 알아본 메서드 콜 순서

 

  • Publisher 생성
  • Processor 생성자 동작- Subscriber를 set해줌
  • Subscription 생성자 동작- Subscriber와 전달받은 매개변수(town)를 set해줌 (NewYork의 Subscription이 만들어짐)
  • Processor의 onSubscribe() 메소드 동작- Subscriber의 onSubscribe() 메소드 호출/매개변수로 subscription전달
  • Subscriber 생성자 동작- 전달받은 subscription 을 set해줌

아래 동작 에러 발생할 때까지 반복 수행

  • Subscription 의 request() 메소드 동작 - Subscriber의 onNext() 메소드 호출 (여기
  • Processor의 onNext() 메소드 동작- Subscriber의 onNext() 메소드 호출
  • Subscriber의 onNext() 메소드 동작- Subscription 의 request() 메소드 호출 

 


  • Subscription의 request() 메소드 동작 중 에러 발생
  • Processor의 onError(Throwable t) 메소드 동작 -  subscriber.onError(t) 메소드 호출함
  • Subscriber의 onError(Throwable t) 메소드 동작

 

구독할 아이의 Subscription을 만들어서 Subscriber에게 전달하면 이제 구독-발행 준비가 완료된다.

Subscriber의 onNext() 메소드가 계속 호출되고, 동작 완료되면 다시 Subscription의 request() 메소드를 호출시킨다. 

로직은 Subscriber의 onNext() 메소드에 쓰면 될 것 같다. 

 

To be continued...

 

 

출처 

모던자바인액션 (라울-게이브리얼 우르마)

https://ozenero.com/java-9-flow-api-example-processor

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

https://brunch.co.kr/@springboot/152

https://12bme.tistory.com/570

https://velog.io/@ehdrms2034/Java-89-Reactive-Java

 

 

 

- TO READ

https://ckddn9496.tistory.com/158