• OSI 7계층

네트워크 통신의 7단계 과정

https://www.youtube.com/watch?v=1pfTxp25MA8 

  • TCP 동작 방식

https://beenii.tistory.com/127

 

TCP 동작 방식 (3-way handshake, 4-way handshake)

웹 서비스 동작 방식을 보면, 사용자가 url을 입력하면 도메인 주소를 이용하여 DNS에서 IP 주소를 얻어오고, 그렇게 얻어온 IP 주소를 웹 데이터 형식으로 변하여 TCP 통신을 통해 웹 서버와 주고받

beenii.tistory.com

 

사용자가 url을 입력하면 도메인 주소를 이용하여 DNS에서 IP 주소를 얻어오고,

그렇게 얻어온 IP 주소를 웹 데이터 형식으로 변하여 TCP 통신을 통해 웹 서버와 주고받게 됩니다.

이때 TCP 통신을 하기 위해 3-way handshake로 접속을, 4-way handshake로 접속 해제

 

 

 

 



사수님께서 Persistence Context에 관한 좋은 내용이 담긴 글을 공유해주셨다. 

https://msolo021015.medium.com/jpa-persistence-context-deep-dive-2f36f9bd6214

 

JPA Persistence Context Deep Dive

최근 저는 새로운 프로젝트를 진행하면서, JPA를 도입하였습니다.

msolo021015.medium.com

 

@Modifying 을 사용중인 나에게 꽤나 충격적이었음..

findById를 해도 update된 데이터가 아니길래 정말 한참을 헤맸었는데...

역시 개념에 대한 정확한 이해 없이 개발을 하면 이렇게 되나보다.

 

JPA를 공부하겠다고 산 두꺼운 서적을 다시 한 번 제대로 살펴봐야겠다.

영속성 컨텍스트를 정확하게 이해하지 않고 그 누가 JPA를 사용한다고 하랴..

 

그래서 다시 한번 중요 개념을 정리하고 넘어가보자 한다.

수없이 봤던 영속성 컨텍스트의 LifeCycle

  • EntityManager & EntityManagerFactory
    • JPA는 스레드 생성시 EntityManagerFactory에서 EntityManager를 생성하고, EntityManager는 DB 커넥션 풀을 사용해 DB에 접근한다. 

 

  • Persistence Context
    • 엔티티를 영구 저장하는 환경 
    • EntityManager.persist(엔티티)를 하게 되면 엔티티를 영속화하여 영속성 컨텍스트에 저장을 하게 되며, 영속성 컨텍스트에 접근하기 위해서는 EntityManager를 통해야 한다. 
    • DB에 저장하게 되는 것은 그 이후의 얘기다. 

 

  • LifeCycle
    • New 
      • 비영속 상태 - 영속성 컨텍스트와 전혀 관계가 없는 상태 
        • 예를 들면 Member m = new Meber(); 이렇게 객체만 생성한 상태 
    • Managed
      • 영속 상태 - 영속성 컨텍스트에 저장된 상태로 DB에 저장되진 않고, 트랜잭션 커밋 시점에 쿼리가 DB로~
      • EntityManager
        // 객체를 생성한 상태(비영속) 
        Member member = new Member();
        member.setId("member1");
        member.setUsername("회원1");
        EntityManager em = emf.createEntityManager();
        em.getTransaction().begin();
        // 객체를 저장한 상태(영속) 
        em.persist(member)
    • Detached
      • 영속성 컨텍스트에 저장되었다가 분리된 상태
        • em.detach(member);
    • Removed
      • 삭제되고, DB에 쿼리 날라감
        • em.remove(member);

 

  • 왜 PersistenceContext (영속성 컨텍스트)를 사용하는가!
    • 1차 캐시
      • 영속성 컨텍스트에는 내부에 1차 캐시가 존재하고, @Id로 선언한 필드의 값과 엔티티를 Key,Value 형태로 캐시에 저장하여, 해당 Key를 갖는 엔티티를 조회할 때 캐시에서 가져다 쓰게 되므로 DB를 갔다 오는 수가 줄어들게 된다. 
      • 예를 들어 ~.findById(123);을 하게 되면 123이라는 key로 조회된 entity를 1차 캐시에 저장하게 된다. 해당 스레드에서 다시 123의 key값을 가지는 entity를 불러오게 되면 db에서 불러오는게 아니라 1차 캐시에서 가져오게 된다. 
    • 동일성 보장 
      • 트랜잭션 격리 수준을 데이터베이스가 아닌 애플리케이션 차원에서 제공하므로 같은 엔티티를 여러번 조회해도 같은 레퍼런스가 되어 동일성이 보장된다.
    • 쓰기 지연 SQL 저장소로 트랜잭션 지원
      • 엔티티들은 1차 캐시에 저장되고, INSERT같은 쿼리는 쓰기 지연 SQL 저장소에 쌓아서 commit()될 때 DB에 동시에 쿼리들을 보낸다. (이것이 flush())다.
    • Dirty Checking
      • 개인적으로 이것이 과연 이점일까 싶지만.... (당연한거 아닌가 하는 부분이라 해야할 까)
      • 영속성컨텍스트에 저장된 엔티티를 수정하면 (~.setName("홍길동)") commit() 혹은 flush()가 일어날 때 변경사항이 있음을 감지하고 DB에 UPDATE 쿼리를 보낸다.

 

  • Flush
    • 영속성 컨텍스트의 변경내용을 DB에 반영하여 싱크를 맞추는 작업
    • 커밋시, 영쓰기 지연 저장소에 쌓아놓은 쿼리들이 날라가 데이터베이스에 반영된다.
    • 호출시점 
      • entityManager.flush()로 직접 호출
      • 트랜잭션 커밋
      • JPQL 쿼리 실행
        • JPQL 실행 전에 무조건 flush()로 DB와의 싱크를 맞춘 다음에 JPQL 쿼리를 날리게끔 되어 있다.
    • 옵션
      • FlushModeType.COMMIT : 커밋할 때만
      • FlushModeType.AUTO : 기본 설정 값으로 커밋이나 쿼리를 실행할 때 
    • 플러시를 했다고 영속성 컨텍스트를 비우는 것은 아니고, 변경 내용을 데이터베이스와 동기화 하는 것이다.

 

  • Lazy Loading
    • 연관관계(@OneToMany 등)를 맺고 있는 엔티티를 검색할 때 연관관계가 있는 엔티티는 프록시 데이터로 채우고, 그것이 실제로 사용될 때만 검색하게 하여 불필요한 쿼리를 실행하지 않게 도와주는 역할 
    • 기본적으로 EAGER보다는 LAZY 관계를 맺게 해주어 추후에 일어날 성능상의 문제를 방지하도록 하자.

 

  • N+1 Problem 
    • 연관관계가 설정된 엔티티를 조회할 경우에 조회된 데이터 개수만큼 연관관계의 조회 쿼리가 추가로 발생하는 문제 
    • Lazy냐 Eager냐 상관없이 발생하는 문제임...
    • 왜 발생하느냐?
      • jpaRepository에 정의된 메서드를 실행하면 JPQL이 싱행되게 된다. JPQL은 특정 SQL에 종속되지 않으면서 객체와 필드 이름을 가지고 쿼리를 하는데, 따라서 findAll()이란 메소드를 수행하게 되면 딱 그 엔티티에만 SELECT절을 날리게 되고, 연관관계인 엔티티는 별도로 호출하게 되는 것이다. 
    • 해결방법은?
      • FetchJoin을 사용하면 되는데 문제는 그렇게 되면 LazyLoading으로 해놓은 것이 무의미하게 되는 것, 연관관계를 맺고 있는 것을 한번에 다 가져오는 것이기 때문. (그것도 outer join으로..) 
      • 현재 운영중인 서비스에서는 획기적인 대안을 찾지 못했기 때문에 쿼리마다 특성을 파악하고 필요에 따라 FetchJoin을 사용하고 있는 상황이다.

 

 

 

 

참고

 https://ict-nroo.tistory.com/130 [개발자의 기록습관]

오랜만에 신규 프로젝트를 생성하여 개발하고 있는데, 설계단에서 조금 어려움을 겪고 있다. 기존 프로젝트보다 더 나은 구현을 해보고 싶은데 생각보다 손이 나가질 않고 있다. 그래서 '개발자가 반드시 정복해야 할 객체 지향과 디자인패턴' 책을 펼쳤고, 책 내용 중 SOLID 부분을 다시금 정리하고 가보려 한다. 

 

 

단일책임원칙 (Single Responsibility Principle)

  • 클래스는 단 한 개의 책임을 가져야 한다. 
  • 예를 들면, 
    • 어떤 클래스에 HttpClinet() 클래스에서 데이터를 로드하는 메소드, 로드된 데이터로 파싱하는 메소드가 있다고 치자.
    • 그런데 HTTP 프로토콜에서 소켓 시반의 프로토콜로 변경되었다.
    • 그렇다면 데이터를 로드하는 메소드, 그리고 파싱하는 메소드 두개 다 변경을 해야 하는 상황인 것이다
    • 이러한 연쇄적은 코드 수정은 두 개의 책임이 한 클래스에 있기 떄문이라고 볼 수 있다. 
    • 데이터를 읽는 것과 데이터를 파싱해서 화면에 보여주는 책임을 분리해야 한다. 
  • 단일 책임 원칙을 잘 지키려면? 
    • 메서드를 실행하는 것이 누구인지 확인해 보는 것 
    • 어떤 클래스에 두개의 메서드가 있는데 각각의 메서드가 A,B 클래스 즉 2개의 클래스에서 사용되는 것이라면 책임 분리 후보가 될 수 있다.

 

개방폐쇄원칙 (Open-closed Principle)

  • 확장에는 열려 있어야 하고, 변경에는 닫혀 있어야 한다.  
    • 기능을 변경하거나 확장할 수 있으면서 그 기능을 사용하는 코드는 수정하지 않는다. 
  • 한 인터페이스를 사용하는 클래스는 인터페이스를 구현한 클래스가 추가되더라도 변경되지 않을 것이다. 

 

리스코프 치환 원칙 (Liskov Substitution Principle)

  • 상위 타입의 객체를 하위 타입의 객체로 치환해도 상위 타입을 사용하는 프로그램은 정상적으로 동작해야 한다. 
  • 리스코프 치환 원칙이 지켜지지 않은 대표적인 예  
    •  이런 코드가 있는데 특수 Item은 무조건 할인을 해주지 않는 정책이 추가되었다고 하자. 이를 반영하기 위해 Coupon 클래스를 아래와 같이 수정할 수 있을 것이다.
    • public class Coupon { public int claculateDiscountAmount(Item item) { return item.getPrice(). * discountRate; } }
    •  위 코드는 아주 흔한 리스코프 치환 원칙 위반 사례이다. Item 타입을 사용하는 코드는 SpecialItem 타입이 존재하는지 알 필요 없이 오직 Item 탗입만 사용해야 하는데 SpecialItem 타입인지의 여부를 확인하고 있다는 것은 SpecialItem이 상위 타입인 Item을 완벽하게 대체하지 못하는 상황이라고 볼 수 있는 것이다. 
    • public class Coupon { public int calculateDiscoutnAmount(Item item) { if (item instanceof SpecialItem) // LSP 위반 발생 return 0; return item.getPrice() * discountRate; } }
    • 타입을 확인하는 기능 (instanceof연산자 같은..)을 사용하는 것은 전형적인 리스코프 치환 언칙을 위반할 때 발생하는 증상이다. 새로운 종류의 하위 타입이 생길 때마다 상위 타입을 사용하는 코드를 수정해줘야 할 가능성을 높이는 것은 개방 폐쇄 원칙을 지킬 수도 없게 하는 것이다. 
    • public class Item { 
      	public boolean isDiscountAbailable() {
          	return true;
          }
      }
      
      public class SpecialItem extends Item {
      	@Override
          public boolean isDiscountAbailable() {
          	return false;
          }
      }
      Item 클래스에 가격 할인 가능 여부를 판단하는 기능을 추가하고, SpecialItem 클래스는 이 기능을 알맞게 재정의 했다. 이렇게 함으로써 Item 클래스만 사용하도록 구현할 수 있게 되었다.
    • public class Coupon {
      	public int calculateDiscountAmount(Item item) {
          	if (!item.isDiscountAvailable()) // instanceof 연산자 사용 제거 
              	return 0;
                  
              return item.getPrice() * discountRate;
          }
      }
      리스코프 치환 원칙이 지켜지지 않으면 쿠폰 예제에서 봤듯이 개방 폐쇄 원칙을 지킬 수 없게 된다. 개방 폐쇄 원칙을 지키지 않으면 기능 확장을 위해 더 많은 부분을 수정해야 하므로, 리스코프 치환 원칙을 지키지 않으면 기능을 확장하기가 어렵게 된다.

 

인터페이스 분리 원칙 (Interface Segregation Priciple) 

  • 인터페이스는 그 인터페이스를 사용하는 클라이언트를 기준으로 분리해야 한다. 
    • 클라이언트가 자신이 이용하지 않는 메서드에 의존하지 않아야 한다는 원칙으로 말할 수 있다.
    • 예를 들어
      • AServiceInterface에 읽기, 쓰기, 삭제가 구현되어 있다고 치자. 그런데 읽기 부분에 변경이 발생했다고 치면 쓰기/삭제 등 변경이 필요 없는 소스 코드도 다시 컴파일해야 하는 경우가 생기는 것이다. 이럴 때에는 쓰기/읽기/삭제를 각각의 인터페이스들로 분리함으로써 각 클라이언트가 사용하지 않는 인터페이스에는 변경이 발생하더라도 영향을 받지 않도록 해야 한다. 
      • 자바의 경우 사용하지 않는 인터페이스 변경에 의해 발생하는 소스재컴파일 문제가 발생하진 않지만 인터페이스 분리 원칙은 재컴파일 문제만 관련된 것이 아니라 용도에 맞게 인터페이스를 분리하는 것, 즉 단일 책임 원칙과도 연결된다.

 

의존 역전 원칙 (Dependency Inversion Priciple)

  • 고수준 모듈은 저수준 모듈의 구현에 의존해서는 안 된다. 저수준 모듈이 고수준 모듈에서 정의한 추상 타입에 의존해야 한다.
  • 저수준 모듈이 변경되더라도 고수준 모듈은 변경되지 않는 것! 
    • 고수준 모듈 : 어떤 의미 있는 단일 기능을 제공하는 모듈
    • 저수준 모듈 : 고수준 모듈의 기능을 구현하기 위해 필요한 하위 기능의 실제 구현 
    • 예를 들어, 
      • 1)
        • 암호화 예의 경우 바이트 데이터를 암호화한다는 것이 이 프로그램의 의미 있는 단일 기능으로서 고수준 모듈에 해당된다. 고수준 모듈은 데이터 읽기, 암호화, 데이터 쓰기라는 하위 기능으로 구성되는데, 저수준 모듈은 이 하위 기능을 실제로 어떻게 구현할지에 대한 내용을 다룬다.
      • 2) 
        • '쿠폰을 적용해서 가격 할인을 받을 수 있다.' '쿠폰은 동시에 한 개만 적용 가능하다' --> 고수준
        • '금액 할인 쿠폰', '비율할인쿠폰' 등 다영한 쿠폰이 존재 --> 저수준 
        • 쿠폰을 이용한 가격 계산 모듈이 개별적인 쿠폰 구현에 의존하게 되면 새로운 쿠폰이 추가되거나 변경될 때마다, 가격 계산 모듈이 변경되는 상황이 초래된다. 
  • 의존 역전 원칙은 앞서 리스코프 치환 원칙과 함께 개방 폐쇄 원칙을 따르는 설계를 만들어 주는 기반이 된다. 

 

 

출처 

책 - 개발자가 반드시 정복해야 할 객체 지향과 디자인패턴 (최범균 지음)

원래 RestTemplate을 통해 Internal API를 호출해왔었지만

RestTemplate이 Deprecated가 되어 WebClient 를 통해 API 통신을 구현하기로 하였다.

WebClient구현 과정에는 Mono Type의 Request를 생성하는 부분이 있어 이를 메소드로 분리하였고,

WebClient의 .onStatus 부분에는 중복코드가 있어 이이 부분을 TypeToken을 이용해 중복코드를 방지하기로 하였다.

 

TypeToken을 어떻게 활용하였는지 나중에 까먹지 않고 다시 사용하기 위해 아주 짧게 기록하겠다.

 

TypeToken

특정 타입의 클래스 정보를 넘겨서 타입 안전성을 꿰하도록 코드를 작성하는 기법을 TypeToken이라 한다.

 

 Mono<ResponseTemplate> monoResponse = webClient.post()
                .uri(uri)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .accept(MediaType.APPLICATION_JSON)
                .body(Mono.just(request), ResponseTemplate.class) 
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, subscribeError(request, NotAppropriateRequestException.class))
                .onStatus(HttpStatus::is5xxServerError, subscribeError(request, InternalServerException.class))
                .bodyToMono(ResponseTemplate.class);

subscribeError 가 해당 부분으로 Function을 return하도록 하였다. 

해당 메소드는 아래와 같다.

 

private <T extends RuntimeException> Function<ResponseTemplate, Mono<? extends Throwable>> subscribeError(Request request, Class<T> e) {
        return response -> {
            response.bodyToMono(ErrorResponseTemplate.class).subscribe(v -> {
                log.error("status code : {}, message : {}", v.getStatus(), v.getMessage());
            });
            throw e.cast("error : " + request.getRequestType());
        };
    }

코드 내용은 회사와 무관한 내용으로 변경함.

 

 

RuntimeException을 extend한 CustomException을 StatusCode 별로 throw하는 내용이다.

 

아래에 TypeToken과 관련된 자세한 내용이 담긴 포스팅을 기록해둔다.

https://yangbongsoo.gitbook.io/study/super_type_token

 

신규 프로젝트에 참여하게 됐는데, 기존에 만들어 오던 API 서버와는 다른 부분이 있었다.

바로, 신규 서버는 DB에 연결하지 않고 DB에 연결된 API 서버들과 내부 통신해서 데이터를 가져온다는 점이었다. 

 

Frontend 에서 Request를 보내면 2번 서버가 받는데 2번 서버는 아래 케이스의 일을 수행한다.

 

case

front에서 2번 서버로 request -> 2번은 4번 서버에 api 콜하여 인증 확인을 한다 -> 1번 서버에 api 콜을 해 PostgreSQL 데이터를 2번 서버로 가져와 데이터를 가공(선택) -> 가공한 데이터를 기반으로 3번 서버에 api 호출을 N번 한다. -> N번 호출해서 받은 데이터를 2번에서 가공 -> 완성된 형태의 response를 frontend로 전달 

 

 

1번 서버 - RDS 디비와 연결 / 4번 서버 - 인증 서버로 RDS 디비와 연결 / 3번 서버 - Elasticsearch 검색엔진으로부터 데이터를 가져옴

 

나는 2번 서버 개발을 맡게 되었는데 3번 서버로 날린 여러 API 호출 결과들을 빠르게 조합해서 Front로 전달해야 했기 때문에 비동기로 해당 작업들을 수행하는 것이 좋겠다고 판단하였다. 

 

기존에는 RestTemplate과 모던 자바의 CompletableFuture 조합으로 비동기를 구현했었지만, 이번 프로젝트에는 새로운 기술을 익혀보고 싶어 WebFlux 를 이용해 보기로 했다. 그런데 WebFlux가 어떤건지 대충은 알았지만 정확하게 무엇인지, 구현하는 방법도 모른다는 문제점이.....

 

이제부터 리액트 프로그래밍에 대해 알아보고, 이것을 신규 프로젝트에 적용할 수 있는지, 할 수 있다면 어떻게 할 수 있는지 알아보도록 한다.

 

어려움이 닥칠 때 마다 찾는 '모던 자바 인 액션'

다시 이 책을 펼쳤고, 먼저 리액티브 프로그래밍에 대해 자세히 공부해보는 것이 좋겠다고 생각하여 이렇게 포스팅을 하게 되었다. WebFlux를 도입하기 전 "Reactor"에 대한 이해가 선행되어야 한다 해서 "Reactor"에 대해 먼저 알아봤다.

 

결론은 신규 프로젝트에 적용을 하진 못했지만, 관련해서 알아봤던 내용들을 정리해보겠다.

 

 

Reactive Programming

리액티브 시스템과 리액티브 프로그래밍은 다른 개념으로 리액티브 시스템은 시스템 레벨에서 아키텍트와 DevOps를 위한 생산성을 제공한다. 리액티브 프로그래밍은 리액티브 시스템의 구현 수준의 하위집합으로, 내부 로직 및 데이터 흐름 관리를 위한 구성요소 단계에서 높은 생산성을 제공한다. 

  • 정의
    • 데이터 흐름과 전달에 관한 프로그래밍 패러다임
    • 리액티브 스트림을 사용하는 프로그래밍
      • 리액티브 스트림 ? 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술
      • 역압력 (Back Pressure) ? 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자가 이벤트를 처리하는 속도보다 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치
  • 왜 이런 패러다임이 생겼나?
    • 적은 수의 스레드로 동시성을 처리하고 더 적은 하드웨어 리소스로 확장할 수 있는 비차단 웹 스택이 필요하다는 것
      • Servlet 3.1에서 non-blocking I/O를 위한 API를 제공했지만 이 API의 사용은 synchronous 이거나 blocking인 ServletAPI와 멀어지는 개념이었다.
    • Java 8에 람다 표현식이 추가됨
      • Java 8에 람다 표현식을 추가되면서 Fuctional Programming이 가능하게 되었고, 이것은 비동기 논리의 선언적 구성을 허용하는 non-blocking 애플리케이션에 대한 이점입니다.
    • 빅데이터/모바일부터 클라우드 기반 클러스터에 이르는 다양한 환경/밀리초 단위의 응답시간을 기대하는 사용패턴에서의 요구사항을 만족시켜 주기 위해
    • 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리해서 문제를 해결한다. 

 

  • Reactive의 속성 4가지 Keyword (핵심원칙)
    • 반응성: 일정하고 예상할 수 있는 빠른 반응시간
    • 회복성: 장애가 전파되지 않고 복구된다.
    • 탄력성: 작업량의 변화와 무관하다. (병목현상) -> 작업 부하 발생시 관련 컴포넌트에 할당된 자원 수 늘린다.
    • 메시지기반 : 컴포넌트 간의 약한 결합, 고립, 위치 투명성이 유지되도록 시스템은 비동기 메시지 전달에 의존
  • JDK에서 리액티브 프로그래밍을 제공하는 기술 
    • RxJava
    • Project Reactor
    • Spring Framework 5.0
  • RxJava
    • 자바로 리액티브 프로그래밍을 할 수 있는 라이브러리
      • 비동기 프로그래밍과 함수형 프로그래밍 기법을 활용한다
    • 복잡한 비동기 프로그램을 쉽게 개발할 수 있게 해준다. 

 

java.util.concurrent.Flow 클래스 

 

  • 자바 9에서는 리액티브 프로그래밍을 제공하기 위해 Flow클래스를 추가했다. 
    • 리액티브 스트림 프로젝트의 표준에 따라 발행-구독 모델을 지원
    • Akka, RxJava등의 리액티르 라이브러리는 Flow 클래스에 정의된 인터페이스를 구현한다. 
  • Flow클래스의 인터페이스
    • Publisher : 항목 발행
    • Subscriber : Publisher가 발행한 항복을 한개 또는 여러개 소비
    • Subscription : 위 소비 과정을 정적 메서드로 관리 / Publisher와 Subscriber 사이의 제어 흐름, 역압력 관리 
    • Processor : 프로세서는 게시자와 구독자 사이에 있는 구성 요소로써 Publisher에 시그널을 요청하거나 아이템을 Subscriber에게 Push함

https://ozenero.com/java-9-flow-api-example-processor

public interface Subscriber<T> {
    void onSubscribe(Subscription s); // 항상 처음 호출됨
    void onNext(T t); // 2번째로 호출되는 데 여러 번 호출될 수 있음
    void onComplete(); // 더 이상의 데이터가 없고 종료됨을 알림
    void onError(Throwable t); // Publisher에 장애가 발생했을 때 호출함
}

Subscriber 인터페이스는 Publisher가 관련 이벤트를 발행할 때 호출할 수 있도록 콜백 메서드 네개를 정의한다. 

Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달한다. Subscription 인터페이스는 메서드 두 개를 정의한다. Subscription은 첫 번째 메서드로 Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알릴 수 있다. 두 번째 메서드로는 Subscription을 취소, 즉 Publisher에게 더 이상 이벤트를 받지 않음을 통지한다.

 

public interface Subscription { 
    void request(long n);
    void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

플로 API를 사용하는 리액티브 애플리케이션의 생명주기

 

https://ozenero.com/java-9-flow-api-example-processor

  • 게시자는 프로세서와 함께 작동하도록 구독을 정의
  • 프로세서는 구독자와 함께 작동하도록 자체 구독을 정의
  • Subscriber::onNext() 메서드를 사용하여 게시자는 항목을 프로세서에 푸시하고 프로세서는 항목을 구독자에 푸시
  • Subscription::request() 메서드를 사용하여 프로세서는 게시자에게 항목을 요청, 구독자는 프로세서에 항목을 요청
  • 게시자 및 프로세서는 멀티스레딩을 위한 실행자를 정의하고, request() 및 onNext() 메서드가 비동기적으로 작동
  • Processor는 Subscriber와 Processor가 요청한 항목의 수요량이 다른 경우 항목을 저장하기 위한 데이터 버퍼 가짐

 

자바 9 플로 API를 직접 이용하는 첫 리액티브 애플리케이션 ( from 모던 자바 인 액션)

 

- TempInfo. 원격온도계로 0에서 99사이 온도를 보고

- TempSubscriber 각 도시에 설치된 센서가 보고한 온도 스트림 출력

 

import java.util.Random;

public class TempInfo {

    public static final Random random = new Random();
    
    private final String town;
    private final int temp;
    
    public TempInfo(String town, int temp) {
    	this.town = town;
        this.temp = temp;
    }
    
    public static TempInfo fetch(String town) {
    	if (random.nextInt(10) == 0) { // 10분의 1 확률로 작업 실패
        	throw new RuntimeException("Error!");
        }
        return new TempInfo(town, random.nextInt(100)); // 임의의 화씨 온도를 반환
    }
    
    @Override
    public String toString() {
    	return town + " : " + temp;
    }
    
    public int getTemp() { 
    	return temp;
    }
    
    public String getTown() {
    	return town;
    }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TempSubscription implements Subscription {

    private final Subscriber<? super TempInfo> subscriber;
    private final String town;
    private static final ExecutorService executor = Executors.newSingleThreadExecutor();

    public TempSubscription (Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit( () -> {
            for (long l = 0L; l < n; l++) {
                try {
                    subscriber.onNext(TempInfo.fetch(town)); // 현재 온도를 subscriber로 전달
                } catch (Exception e) {
                    subscriber.onError(e); // 실패하면 subscriber로 에러 전달
                    break;
                }
            }
        });
    }

    @Override
    public void cancel() {
        subscriber.onComplete(); // 구독취소되면 subscriber로 전달
    }
}

ExecutorService는 비동기 모드에서 실행 중인 작업을 단순화하는 JDK API

일반적으로 ExecutorService는 스레드 풀과 여기에 태스크를 할당하기 위한 API를 자동으로 제공합니다.

 

 

public class TempSubscriber implements Flow.Subscriber<TempInfo> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        System.out.println(tempInfo);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        System.err.println(t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

 

Processor는 Subscriber이며 동시에 Publisher다.

Processor의 목적은 Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것이다. 

 

화씨를 섭시로 변환하는 Processor

import java.util.concurrent.Flow.*;

public class TempProcessor implements Processor<TempInfo, TempInfo> {

    private Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Subscriber<? super TempInfo> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onNext(TempInfo temp) {
        subscriber.onNext(new TempInfo(temp.getTown(),
                (temp.getTemp() - 32) * 5 / 9));
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }

Publisher의 subscribe 메서드는 업스트림 Subscriber를 Processor로 등록하는 동작을 수행한다. 

 

import java.util.concurrent.Flow.Publisher;

public class Main {
    public static void main(String[] args) {
        getCelsiusTemperatures("New York").subscribe(new TempSubscriber());
    }

    private static Publisher<TempInfo> getCelsiusTemperatures(String town) {
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe(subscriber);
            processor.onSubscribe(new TempSubscription(processor, town));
        };
    }
}

 

로그를 찍어보며 알아본 메서드 콜 순서

 

  • Publisher 생성
  • Processor 생성자 동작- Subscriber를 set해줌
  • Subscription 생성자 동작- Subscriber와 전달받은 매개변수(town)를 set해줌 (NewYork의 Subscription이 만들어짐)
  • Processor의 onSubscribe() 메소드 동작- Subscriber의 onSubscribe() 메소드 호출/매개변수로 subscription전달
  • Subscriber 생성자 동작- 전달받은 subscription 을 set해줌

아래 동작 에러 발생할 때까지 반복 수행

  • Subscription 의 request() 메소드 동작 - Subscriber의 onNext() 메소드 호출 (여기
  • Processor의 onNext() 메소드 동작- Subscriber의 onNext() 메소드 호출
  • Subscriber의 onNext() 메소드 동작- Subscription 의 request() 메소드 호출 

 


  • Subscription의 request() 메소드 동작 중 에러 발생
  • Processor의 onError(Throwable t) 메소드 동작 -  subscriber.onError(t) 메소드 호출함
  • Subscriber의 onError(Throwable t) 메소드 동작

 

구독할 아이의 Subscription을 만들어서 Subscriber에게 전달하면 이제 구독-발행 준비가 완료된다.

Subscriber의 onNext() 메소드가 계속 호출되고, 동작 완료되면 다시 Subscription의 request() 메소드를 호출시킨다. 

로직은 Subscriber의 onNext() 메소드에 쓰면 될 것 같다. 

 

To be continued...

 

 

출처 

모던자바인액션 (라울-게이브리얼 우르마)

https://ozenero.com/java-9-flow-api-example-processor

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

https://brunch.co.kr/@springboot/152

https://12bme.tistory.com/570

https://velog.io/@ehdrms2034/Java-89-Reactive-Java

 

 

 

- TO READ

https://ckddn9496.tistory.com/158

 

 

 

 

 

+ Recent posts