사수님이 다른 사람의 코드도 많이 봐야 된다고 말씀해 주셨는데

백엔드 개발자가 팀에 2명 밖에 없기도 하고, 뭔가 동기부여도 잘 안되고 해서

거의 혼자만의 개발 세계에서 코드를 써내려왔던 것 같다. 

그러다가 이제 또 새로운 개발 프로젝트에 착수하게 되었는데 

뭔가... 이렇게 가다간 발전도 없이 계속 같은 코드만, 같은 기술만 사용할 것 같단 위기감이 들었다.

 

그래서 요즘 컨퍼런스도 좀 챙겨 보고 핫 하다는 기술도 좀 기웃기웃 거리고 있다.

Reactive Kafka가 무엇인지 알기 위해 NHN FORWARD 2019의 세션 영상을 보다가 샘플 코드를 얻게 되었는데

Kafka는 제쳐두고 일단 코드가 굉장히 깔끔하고 구조도 내가 써먹을게 많겠다 싶었다.

이참에 다른 사람의 소스 코드도 좀 봐야지란 생각에 우선 코드 구조부터 파악하기로 했다.

 

카프카에 대한 개념은 이전 포스팅에서 다뤘습니다 (https://modimodi.tistory.com/72)

 

 

https://github.com/EleganceLESS/nhn-forward-2019

 

GitHub - EleganceLESS/nhn-forward-2019

Contribute to EleganceLESS/nhn-forward-2019 development by creating an account on GitHub.

github.com

 

아래 이미지는 Class Diagram이 아니라는 것을 명확히 하고 싶다 ^^;

소스코드 이해를 위해서 파워포인트로 나름 클래스 구조를 그려봤는데 이게 나의 최선이라는 것에 좌절감이ㅋㅋ

 

Kafka는 제쳐두고 우선 참고할 만한 코드 부분에서 인상적이었던 부분을 좀 정리해보도록 하겠다. 

  • 우선 abstract 클래스를 많이 사용한 것이 눈여겨볼 부분이다. 
    • AS-IS 관계를 abstract 클래스를 활용해 더 세밀하게 컨트롤했다.
  • DemoController에는 각 자식 Controller에서 공통적으로 사용하는 /start, /stop EndPoint가 존재한다.
    • DemoController는 DemoService를 주입받고, DemoService의 start(), stop() 메서드를 활용한다.
    • 각 Controller에서는 메시지를 소비하고 정지하는 방법은 같지만 메시지를 생성하는 방법은 다른 Service들을 Injection 시킨다. 
  • DemoService는 추상클래스로 consume()이라는 추상 메서드를 가진다. 
    • DemoService의 다른 메서드는 자식 Service에서 사용되지만 consume() 추상 메서드는 서비스별로 다르기에 이 구조를 만든 것으로 보인다. 
  • DemoService의 consume()을 구현한 클래스 또한 추상 클래스이다. DemoService의 자식 클래스는 OperatorDemoService, SubscriberDemoService이고 각각 consumer(), getSubscriber() 추상 메서드를 포함하고 있다.
    • 메서드 구현을 자식 클래스에 맡기는 데 사용은 부모 클래스에서 한다.
      • 예를 들어 SubscriberDemoService의 getSubscriber() 메서드는 추상 메서드이고, 해당 클래스의 cosume() 메서드에서 이 getSubscriber() 메서드가 사용되는데 getSubscriber() 메서드 구현은 자식이 담당한다.

 

각 클래스 별로 공통적으로 사용하는 코드가 있을 때는 AS-IS 관계인지에 따라 상속을 이용했고, AS-IS 관계가 아닌 경우 interface의 default 기능을 활용했다. 그리고 부모-자식 관계를 1 depth 이상으로 가져가려면 그 관계에 대해 치밀하게 생각해봤다는 것이겠지..? 일단 코드 작성 전에 클래스 간의 관계를 고려해 설계부터 잘 하고 개발에 착수해야 겠단 생각이 많이 들었던 코드이다. 

 

테스트 코드도 눈여겨 볼게 많다. 

 

Test Code

 

Controller 단

@Test
    public void startAndStopTest() {
        Step0Service service = mock(Step0Service.class);
        when(service.start()).thenReturn(Mono.just("START"));
        when(service.stop()).thenReturn(Mono.just("STOP"));

        Step0Controller controller = new Step0Controller(service);

        WebTestClient testClient = WebTestClient.bindToController(controller)
                .build();

        testClient.get().uri("/step0/start")
                .exchange()
                .expectBody(String.class)
                .isEqualTo("START");

        testClient.get().uri("/step0/start")
                .exchange()
                .expectBody(String.class)
                .isEqualTo("Already Running");

        testClient.get().uri("/step0/stop")
                .exchange()
                .expectBody(String.class)
                .isEqualTo("STOP");

        testClient.get().uri("/step0/stop")
                .exchange()
                .expectBody(String.class)
                .isEqualTo("Not Running Now");
    }

Repository단

@Test
public void notifyTest() {
    StepVerifier.withVirtualTime(() -> repository.notify(Tuples.of(1, "홍길동")))
            .thenAwait(Duration.ofSeconds(3))
            .expectNext(Tuples.of("홍길동", true))
            .verifyComplete();
}
@Test
public void getReceiversTest() {

    StepVerifier.create(repository.getReceivers(1))
            .verifyComplete();

    StepVerifier.create(repository.getReceivers(2))
            .expectNext(Tuples.of(2, "조조"))
            .verifyComplete();

    StepVerifier.create(repository.getReceivers(3))
            .expectNext(Tuples.of(3, "유비"))
            .verifyComplete();

    StepVerifier.create(repository.getReceivers(4))
            .expectNext(Tuples.of(4, "조조"))
            .expectNext(Tuples.of(4, "손권"))
            .verifyComplete();
}

Service단

@Test
public void step2ConsumerTest() {
    ReceiverOffset offset = mock(ReceiverOffset.class);
    doNothing().when(offset).acknowledge();

    ReceiverRecord<String, String> record1 = mock(ReceiverRecord.class);
    when(record1.key()).thenReturn("1");
    when(record1.value()).thenReturn("1");
    when(record1.receiverOffset()).thenReturn(offset);

    ReceiverRecord<String, String> record2 = mock(ReceiverRecord.class);
    when(record2.key()).thenReturn("2");
    when(record2.value()).thenReturn("2");
    when(record2.receiverOffset()).thenReturn(offset);
}
 @Test
public void samplingTest() {
    Step2Service service = new Step2Service(null, null);

    StepVerifier.withVirtualTime(() -> Flux.just(1, 1, 1, 1, 1)
            .groupBy(Function.identity())
            .flatMap(service::sampling))
            .thenAwait(Duration.ofSeconds(5))
            .expectNext(1)
            .verifyComplete();
}

 

이 프로젝트 코드를 분석하기로 마음 먹은 것이 테스트 코드를 보고 난 다음이다.

mock을 활용해 단위 테스트를 진행하고, StepVerifier나 doNothing() ..? 이건 내가 한 번도 사용해보지 않은;

이미 작성해 놓은 테스트 코드를 다 뜯어 고치고 싶어서 시간 될 때 마다 조금씩 뜯어 고치고는 있는데

이게 한 두개 건드리다 보니 정말 끝도 없다^^;;

사수님이 말씀해주셨듯 이미 작성된 코드 건들다 보면 한도 끝도 없으니

새로 개발할 것들에 새로 익힌 기술들을 사용해보라는 말씀을 명심해야겠다... 

 

 

 

이 포스팅은 모던 자바 인 액션 책에 나와있는 내용을 바탕으로 작성하였습니다.

 

이미 어느 정도 익숙하고, 많이 사용해왔던 모던 자바의 기법이지만 언제나 그렇듯 기본과 원리가 중요하다는 사실을

인지하면서 프로젝트가 마무리 되는 이 시점에 한 번 더 정리해보려고 한다.

기존에는 책을 대충 훑고 난 다음 사용했기 때문에 이번에는 좀 정독을 하면서 중요 부분은 포스팅해보려고 한다. 

 

람다란 무엇인가?

  • 메서드로 전달할 수 있는 익명함수를 단순화한 것 (이름없음)
  • 기본 문법
    • (parameters) -> expression
    • (parameters) -> { statements; } 

 

어디에 어떻게 람다를 사용할까?

  • 함수형 인터페이스를 인수로 받는 메서드에 람다 표현식을 사용할 수 있다. 
    • 함수형 인터페이스는 정확히 하나의 추상 메서드를 지정하는 인터페이스이다. 
  • 람다 표현식으로 함수형 인터페이스의 추상 메서드 구현을 직접 전달할 수 있으므로 전체 표현식을 함수형 인터페이스의 인스턴스로 취급
// 함수형 인터페이스 예시
public interface Predicate<T> {
	boolean test(T t);
}

public interface Comparator<T> {
	int compare(T o1, T o2);
}
public interface Runnable {
	void run();
}

public void execute(Runnable r) {
	r.run();
}

execute(() -> {});

 

람다를 어떻게 활용할 수 있을까?

  • 실행어라운드 패턴에 활용할 수 있다. 
    • 예를 들어 데이터베이스의 파일 처리를 보면 자원 열고 처리하고 자원 닫는 순서로 이루어 진다. 이 때 실제 자원을 처리하는 코드를 설정과 정리 두 과정이 둘러싸는 형태를 갖는데 이런 형태를 바로 실행 어라운드 패턴이라고 부른다. 
    • 초기화/준비 코드
      작업 A
      정리/마무리 코드
    • 초기화/준비 코드
      작업 B
      정리/마무리 코드
    • 중복되는 준비 코드와 정리 코드가 작업 A와 작업 B를 감싸고 있다. 
// 한 줄씩 읽는 기존 코드
public String work() throws IOException {
	try (BufferedReader br = new BufferedReader(new FileReader("hi.txt"))) {
    	return br.readLine();
    }
}

// 근데 상황에 따라 두 줄을 읽어야 된다는 요구사항이 들어왔다.
// 따라서 br.readLine(); 이 부분의 동작을 함수형 인터페이스를 사용하는 것으로 바꾼다.
public interface BufferedReaderProcessor {
	String process(BufferedReader b) throws IOException;
}

public String work(BufferedReaderProcessor p) throws IOException {
	try (BufferedReader br = new BufferedReader(new FileReader("hi.txt"))) {
    	return p.process(br);
    }
}

String result = work((BufferedReader br) -> br.readLine());
String result2 = work((BufferedReader br) -> br.readLine() + br.readLine());

 

함수형 인터페이스와 람다의 활용

  • 다양한 람다 표현식을 사용하려면 공통의 함수 디스크립터를 기술하는 함수형 인터페이스 집합이 필요하다. 
    • 함수형 인터페이스의 추상 메서드 시그니처를 함수 디스크립터(fuction descriptor)라고 한다. 
  • 자바 API는 Comparable, Runnable, Callable, Predicate, Consumer, Function 등의 함수형 인터페이스를 제공하고 있고, 이 인터페이스에 대한 자세한 설명은 이 포스팅에서 생략하도록 한다. (이와 관련된 포스팅은 따로 있음)
@FunctionalInterface
public interface Consumer<T> {
	void accept(T t);
}

@FunctionalInterface
public interface Function<T,R> {
	R apply (T t);
}

@FunctionalInterface
public interface Predicate<T> {
	boolean test(T t);
}
// Predicate 활용 예시
Predicate<String> nonEmptyStringPredicate = (String s) -> !s.isEmpty();
List<String> nonEmpty = filter(listOfString, nonEmptyStringPredicate);

// Consumer 활용 예시
public <T> void forEach(List<T> list, Consumer<T> c) {
	for (T t: list) {
    	c.accept(t);
    }
}

forEach(Arrays.asList(1,2,3,4,5), (Integer i) -> System.out.println(i));
@FunctionalInterface
public interface Function<T> {
	R apply (T t);
}

public <T,R> List<R> map(List<T> list, Function<T,R> f) {
	List<R> result = new ArrayList<>();
    for (T t: list) {
    	result.add(f.apply(t));
    }
    return result;
}

List<Integer> list = map(Arrays.asList("lamdas", "in", "action"),
							(String s) -> s.length()
                            );

 

기본형 특화

자바의 모든 형식은 참조형(Object, Integer, List) 아니면 기본형(int, double, byte, char)가 있다.

제네릭 파라미터 Consumer<T> 의 T 같은 데에는 참조형만 사용할 수 있다는

제네릭의 내부 구현 방식으로 인해 자바에서는 기본형을 참조형으로 박싱하는 기능을 제공한다. 

 

그리고 박싱과 언박싱이 자동으로 이뤄지는 오토박싱 기능도 제공하는데 (int가 Integer로)

이러한 변화과정은 비용이 소모된다. 박싱한 값은 기본형을 감싸는 Wrapper이며 이것은 Heap에 저장된다. 

따라서 박싱한 값이 메모리를 더 소비하기 때문에 기본형을 가져올 때에도 메모리를 탐색해야 하는 과정을 거친다

 

이런 이유로 오토박싱 동작을 피할 수 있도록 특별한 함수형 인터페이스를 제공한다. 

Predicate<Integer> -> IntPredicate 이 외 DoublePredicate, IntConsumer, LongBinaryOperator, IntFuntion가 있다. 

public interface IntPredicate {
	boolean test(int i);
}

 

형식 추론

자바 컴파일러는 람다 표현식이 사용된 콘텍스트를 이용해서 관련된 함수형 인터페이스를 추론해낸다!

그리고 이런 추론은 개발자가 람다 문법에서 파라미터 형식을 생략하더라도 추론이 가능하기 때문에 

아래와 같이 두 가지 형태로 사용이 가능하며 개발자 스스로 어떤 코드가 가독성을 향상 시킬 수 있는지 결정해야 한다.

Comparator<Apple> c = (Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight());
Comparator<Apple> c = (a1, a2) -> a1.getWeight().compareTo(a2.getWeight());

 

람다 캡처링

람다 표현식은 인수를 자신의 바디 안에서만 사용했다. 하지만 자유변수(외부에서 정의된 변수)를 활용할 수 있고,

이것을 람다캡처링(capturing lambda)라고 부른다. 

int portNumber = 8289;
Runnable r = () -> System.out.println(portNumber);

하지만 이에는 제약사항이 있는데

지역 변수가 명시적으로 final로 선언되거나, final은 아니지만 실제론 final처럼 사용되어야 한다는 것이다. 

위에서 portNumber가 다시 portNumber=8080으로 두 번 값을 할당하면 컴파일 에러가 발생하게 될 것이다. 

 

왜 이런 제약사항이 있는가?

인스턴스 변수는 힙에 저장되고, 지역 변수는 스택에 위치하는데 람다가 만약 스레드에서 실행된다고 하면 변수를 할당한 스레드가 사라져서 변수 할당이 해제되었는데도 람다를 실행하는 스레드는 해당 변수에 접근하려 할 것이다. 따라서 자바 구현에서는 원래 변수에 접근을 허용하지 않고, 자유 지역 변수의 복사본을 제공하게 되었고 이 복사본의 값이 바뀌지 않아야 하므로 지역변수에는 값을 한 번만 할당해야 하는 것이다. (병렬화에도 관련이 있음)

 

클로저

함수의 비지역 변수를 자유롭게 참조할 수 있는 함수의 인스턴스

람다와 익명 클래스 모두 메서드의 인수로 전달될 수 있으며 자신의 외부 영역의 변수에 접근할 수 있다. 다만 람다와 익명클래스는 람다가 정의된 메서드의 지역 변수의 값은 바꿀 수 없다. 

 

메서드 참조

메서드를 어떻게 호출해야 하는지 설명을 참조하기 보다는 메서드명을 직접 참조해서 가독성을 높인다. 예를 들어 Apple::getWeight는 Apple클래스에 정의된 getWeight의 메서드 참조이다. 결과적으로 메서드 참조는 람다 표현식 
(Apple a) -> a.getWeight()을 축약한 것이다. 

 

방법

1. 정적 메서드 참조 -> Integer::parseInt

2. 다양한 형식의 인스턴스 메서드 참조 -> String::length

3. 기존 객체의 인스턴스 메서드 참조 -> t::getValue (Transaction 객체를 할당받은 t 지역변수의 getValue 메서드)

 

예시

ToIntFuntion<String> stringToInt = (String s) -> Integer.parseInt(s);

  ->  Funtion<String, Integer> stringToInteger = Integer::parseInt;

BiPredicate<List<String>, String> contains = (list, element) -> list.contains(element);

  ->   Predicate<List<String>, String> contains = List::contains;

Predicate<String> startsWithNumber = (String string) -> this.startsWithNumber(string);

  ->   Predicate<String> startsWithNumber = this::startsWithNumber;

 

생성자 참조

ClassName::new처럼 클래스명과 new 키워드를 이용해 기존 생성자의 참조를 만들 수 있다. 

Supplier의 () -> Apple과 같은 시그니처를 갖는 생성자가 있다고 가정하면 아래와 같이 사용 가능하다. 

Supplier<Apple> c1 = Apple::new;
Apple a1 = c1.get();

Function을 이용하면 아래와 같이 사용할 수 있다.

Function<Integer, Apple> c2 = Apple::new;
Apple a1 = c2.apply(110);

인스턴스화하지 않고도 생성자에 접근할 수 있는 기능을 다양한 상황에 응용할 수 있다. 

static Map<String, Function<Integer, Fruit>> map = new HashMap<>();
static {
	map.put("apple", Apple::new);
    map.put("orange", Orange::new));
}

public static Fruit giveMeFruit(String fruit, Integer weight) {
	return map.get(fruit).apply(weight);
}

 

람다 표현식을 조합할 수 있는 유용한 메서드

자바 8 API의 몇몇 함수형 인터페이스는 디폴트 메서드를 활용해 다양한 유틸리티 메서드를 제공하고 있다. 

 

Comparator

역정렬 reversed(), 두번째 비교자 thenComparing 이 있다.

inventory.sort(comparing(Apple::getWeight)
		 .reversed()
		 .thenComparing(Apple::getCountry));

Prediacte 

negate, and, or 세가지 메서드 제공한다. 

negate는 특정 프레디케이트를 반전시킬 때 예를 들어 '빨간색이 아닌 사과' 같은..

Predicate<Apple> notRedApple = redApple.negate();

and와 or은 말그대로이다.

Predicate<Apple> redAndHeavyAppleOrGreen = 
	redApple.and(apple -> apple.getWeight() > 150)
    		.or(apple -> GREEN.equals(a.getColor()));

Function

Function인터페이스를 반환하는 andThen, compose 두 가지 디폴트 메서드를 제공한다. 

andThen은 주어진 함수를 먼저 적용한 결과를 다음 함수의 입력으로, compose메서드는 인수로 주어진 함수를 먼저 실행한 다음에 그 결과를 외부함수의 인수로 제공한다. 즉, f.andThen(g)에서 andThen 대신에 compose를 사용하면 g(f(x))가 아니라 f(g(x))라는 수식이 된다. 

Funtion<Integer, Integer> f = x -> x + 1;
Funtion<Integer, Integer> g = x -> x * 2;
Funtion<Integer, Integer> h = f.componse(g);
int result = h.apply(1); // 3을 반환한다.

' > 모던자바인액션' 카테고리의 다른 글

Java 함수형 프로그래밍에 대해 알아보자.  (0) 2022.06.16

현재 오픈 예정인 Admin 서비스의 경우 사용자 활동에 대한 로그를 S3에 남겨야 한다.

처음에 욕심으로 RequestBody에 담기 내용도 올려야지~ 하고는 Interceptor에서 HttpServletRequest에 담긴

Body 내용을 기록했는데 Body 데이터가 Controller단에서 사용이 불가한 것 아니겠는가.

 

HttpServletRequest 인터페이스는 본문을 읽기 위해 getInputStream() 메소드를 갖고 있는데 기본적으로

이 InputStream의 데이터는 한 번만 읽을 수 있도록 설계가 되어 있다.

해당 데이터를 재활용하려면 Wrapper를 이용해야 했는데 개인적인 생각으로 이는 Too much라 생각되었고,

그냥 URL에 담겨있는 ip, method, url 같은 간단한 정보만 기록하는 것으로 일보 후퇴하였다.

이게 데이터를 어쨌든 복사하는 개념이니 Body에 데이터가 많은 경우

매 Request마다 이를 수행하는 것은 성능에도 좋을 수 없겠다는 판단이 들어서이기도 하다.

(never ever 귀찮아서가 아니었음)

 

그로부터 일년이 지났을까..ㅋㅋ 팀장님께서 다른 서버는 몰라도

Admin 서버의 경우에는 Body 데이터를 기록해야 된다는 의견을 주셨고, 결국 이를 수행하기로......

(그래, Admin은 성능보다는 정확성과 기록이 중요한거 아니겠어?!)

 

나는 아래와 같이 설정하였고, 잘 수행되는 것까지 확인하였다.

파일과 같은 멀티파트 타입의 데이터를 보내는 경우에는 

StandardMultipartHttpServletRequest가 Cast 불가하다는 에러를 발생시키기 때문에 

"application/json" 콘텐트 타입만 처리하도록 만들었다. 

 

 

Filter단

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;


@Slf4j
@Component
public class InitSettingFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) {
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        CachedBodyHttpServletWrapper cachedBodyHttpServletWrapper = new CachedBodyHttpServletWrapper(httpRequest);
        chain.doFilter(cachedBodyHttpServletWrapper, response);
    }

    @Override
    public void destroy() {
    }
}

 

HttpServletRequestWrapper구현

import lombok.SneakyThrows;
import org.springframework.util.StreamUtils;

import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class CachedBodyHttpServletWrapper extends HttpServletRequestWrapper {

    private byte[] cachedBody;

    public CachedBodyHttpServletWrapper(HttpServletRequest request) throws IOException {
        super(request);
        InputStream requestInputStream = request.getInputStream();
        this.cachedBody = StreamUtils.copyToByteArray(requestInputStream);
    }

    @Override
    public ServletInputStream getInputStream() throws IOException {
        return new CachedBodyServletInputStream(this.cachedBody);
    }

    @Override
    public BufferedReader getReader() throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(this.cachedBody);
        return new BufferedReader(new InputStreamReader(byteArrayInputStream, "UTF-8"));
    }

    /*
    Inner Class
     */
    public class CachedBodyServletInputStream extends ServletInputStream {

        private InputStream cachedBodyInputStream;

        public CachedBodyServletInputStream(byte[] cachedBody) {
            this.cachedBodyInputStream = new ByteArrayInputStream(cachedBody);
        }

        @SneakyThrows
        @Override
        public boolean isFinished() {
            return cachedBodyInputStream.available() == 0;
        }

        @Override
        public boolean isReady() {
            return true;
        }

        @Override
        public void setReadListener(ReadListener readListener) {
        }

        @Override
        public int read() throws IOException {
            return cachedBodyInputStream.read();
        }
    }
}

 

Interceptor단

preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) 메소드에 아래의 로직을 추가하였다. 

if (request.getContentType() != null && request.getContentType().contains("application/json")) {
    final CachedBodyHttpServletWrapper cachingRequest = (CachedBodyHttpServletWrapper) request;
    String requestBody;
    if ((requestBody = cachingRequest.getReader().readLine()) != null) {
        // S3로 업로드!
    }
} else {
	// Body 제외한 상태로 S3에 업로드!
}

 

끝!

 

참고

https://www.baeldung.com/spring-reading-httpservletrequest-multiple-times

 

출처 

https://www.baeldung.com/java-functional-programming

책 - 모던자바인액션 

 

1. 소개

 

이 튜토리얼에서 우리는 함수형 프로그래밍 패러다임의 핵심 원칙과 자바 프로그래밍 언어에서 이를 실행하는 방법을 이해하고, 고급 함수형 프로그래밍 기술 중 일부를 다뤄볼 것입니다.

 

2. 함수형 프로그래밍이란?

 

함수형 프로그래밍은 기본적으로 수학의 함수처럼 컴퓨터 프로그램을 작성하는 스타일을 말합니다. 그렇다면 수학에서 함수란 무엇일까요? 함수는 입력 집합을 출력 집합에 연결하는 표현식인데, 여기서 중요한 것은 함수의 출력은 입력에만 의존한다는 것입니다. 



from 모던자바인액션 책


  • 함수란 수학적인 함수와 같다. 즉, 함수는 0개 이상의 인수를 가지며, 한 개 이상의 결과를 반환하지만 부작용이 없다.
  • 함수나 메서드는 지역 변수만을 변경해야 함수형이라 할 수 있다. 그러나 함수나 메서드에서 참조하는 객체가 있다면 그 객체는 불변 객체여야 한다. 즉, 객체의 모든 필드가 final이어야 한다는 것.
  • 함수형은 함수나 메서드가 어떤 예외도 일으키지 않아야 한다. 예외가 발생하면 return으로 결과를 반환할 수 없게 되는데 수학적 함수는 함수값에 대응하는 하나의 결과를 반환해야 한다. 
  • 선언적 프로그래밍과 함수형 프로그래밍의 비교하자면 선언적 프로그래밍은 '어떻게(how)에 집중하는 반편 함수형은 '무엇을'에 집중한다. 스트림 API를 이용해 구현 방법은 라이브러리가 결정한다. 

 

//선억적 프로그래밍

Transaction mostExpensive = transaction.get(0);

if (mostExpensive == null

throw new IllegalArgumentException("Empty list of trancsactions")

    

for (Transaction t : transactions.subList(1, transactions.size())) {

if (t.getValue() > mostExpensive.getValue()) {

     mostExpensive = t;

    }

}

 

//함수형 프로그래밍

Optional<Transaction> mostExpensive = transactions.stream().max(comparing(Transaction::getValue));


2.1. 프로그래밍 패러다임의 분류

 

대체로 프로그래밍 스타일은 명령형 프로그래밍 패러다임과 선언적 프로그래밍 패러다임으로 분류할 수 있습니다. 명령형 접근 방식은 프로그램을 최종 상태에 도달할 때까지 프로그램의 상태를 변경하는 일련의 명령문으로 정의합니다. 절차적 프로그래밍은 절차나 서브루틴을 사용하여 프로그램을 구성하는 명령형 프로그래밍의 한 유형입니다. 객체 지향 프로그래밍(OOP)으로 알려진 인기 있는 프로그래밍 패러다임 중 하나는 절차적 프로그래밍 개념을 확장한 것입니다. 대조적으로, 선언적 접근 방식은 일련의 명령문으로 제어 흐름을 설명하지 않고 계산 논리를 표현합니다. 간단히 말해서, 선언적 접근 방식의 초점은 프로그램이 달성해야 하는 방법보다는 프로그램이 달성해야 하는 것을 정의하는 것입니다. 함수형 프로그래밍은 선언적 프로그래밍 언어의 하위 집합입니다. 오늘날 인기 있는 프로그래밍 언어의 대부분은 범용 언어이므로 여러 프로그래밍 패러다임을 지원하는 경향이 있다는 것을 이해하는 것이 중요합니다.

 

3. 기본 원칙 및 개념

 

이 섹션에서는 함수형 프로그래밍의 몇 가지 기본 원칙과 이를 Java에 적용하는 방법을 다룹니다. 우리가 사용할 많은 기능이 항상 Java의 일부가 아니어서 함수형 프로그래밍을 효과적으로 실행하기 위해선 Java 8 이상을 사용하는 게 좋습니다.

 

3.1. 일급 및 고차 함수

 

프로그래밍 언어는 함수를 일급 시민(first-citizen)으로 취급하는 경우 일급 함수를 갖는다고 합니다. 함수를 변수에 할당하고, 다른 함수에 인수로 전달하고, 다른 함수의 값으로 반환하는 것이 포함됩니다. 이 속성을 사용하면 함수형 프로그래밍에서 고차 함수를 정의할 수 있습니다. 고차 함수는 함수를 인수로 받고 결과로 함수를 반환할 수 있습니다. 이를 통해 함수 구성 및 커링과 같은 함수 프로그래밍의 여러 기술을 추가로 사용할 수 있습니다.

  • 함수를 마치 일반값처럼 사용해서 인수로 전달하거나, 결과로 반환받거나, 자료구조에 저장할 수 있음을 의미
  • 자바 8이 이전 버전과 구별되는 특징 중 하나가 일급 함수를 지원한다는 점인데 :: 연산자로 메서드 참조를 만들거나 (int x) -> x + 1 같은 람다 표현식으로 직접 함숫값을 표현해서 메서드를 함숫값으로 사용할 수 있습니다. 

Collections.sort 메서드에 사용자 지정 비교기를 제공해야 한다고 가정해 보겠습니다. 아래의 코드는 우리가 볼 수 있듯이 이것은 지루하고 장황한 기술입니다. 

Collections.sort(numbers, new Comparator<Integer>() {

    @Override

    public int compare(Integer n1, Integer n2) {

        return n1.compareTo(n2);

    }

});

Collections.sort(numbers, (n1, n2) -> n1.compareTo(n2));

Java는 람다식을 Object로 취급하는데 이건 사실 Java의 진정한 1급 시민입니다. 

 

 

3.2. 순수 함수

 

순수 함수는 오직 argument만을 기반으로 값을 반환해야 하며 side-effect가 없는것을 강조합니다. 이것은 Java의 모든 best practice와 상당히 반대되는 것처럼 들릴 수 있습니다.

객체 지향 언어인 Java는 핵심 프로그래밍 방식으로 캡슐화를 권장합니다. 개체의 내부 상태를 숨기고 액세스 및 수정에 필요한 메서드만 노출되는 것을 권장합니다. 따라서 이러한 메서드는 엄격하게 순수한 함수는 아닙니다.

 

물론 캡슐화 및 기타 객체 지향 원칙은 권장 사항일 뿐 Java에서 바인딩되지 않습니다. 사실, 개발자들은 최근에 불변 상태 정의의 가치와 side-effect가 없는 메소드의 가치를 깨닫기 시작했습니다. 

Integer sum(List<Integer> numbers) {

    return numbers.stream().collect(Collectors.summingInt(Integer::intValue));

}

이제 이 방법은 받는 인수에만 의존하며 어떠한 부작용도 일으키지 않습니다.

부작용은 메서드의 의도된 동작을 제외하고 무엇이든 될 수 있습니다. 예를 들어, 부작용은 값을 반환하기 이전에 local 또는 global 상태를 업데이트하거나 데이터베이스에 저장하는 것을 말할 수 있습니다. 순수주의자들은 또한 로깅도 부작용으로 취급하지만 우리 모두는 우리 만의 룰이 있습니다. 하지만 실제로 결과를 데이터베이스에 저장해야 할 수도 있습니다. 함수형 프로그래밍에는 순수한 함수를 유지하면서 부작용을 처리하는 기술이 있습니다. 

 

 

3.3. 불변성

 

불변성은 함수형 프로그래밍의 핵심 원리 중 하나로, 엔터티를 인스턴스화한 후에는 수정할 수 없는 속성을 말합니다. 함수형 프로그래밍 언어에서는 이는 언어 수준(Language level)에서 지원되는데 자바에서는 변경할 수 없는 데이터 구조를 만들기 위해 스스로 결정해야 합니다. 

 

Java 자체는 String과 같은 몇 가지 기본 제공 불변 유형을 제공합니다. 이것은 주로 보안상의 이유입니다. 클래스 로딩에서 String을 많이 사용하고 해시 기반 데이터 구조에서 키로 사용하기 때문입니다. 기본 래퍼(primitive wrapper) 및 수학 유형과 같은 몇 가지 다른 기본 제공 불변 유형이 있습니다.

 

그러나 우리가 Java에서 생성하는 데이터 구조는 어떻습니까? 물론, 그것들은 기본적으로 변하지 않고,, 불변성을 달성하기 위해 몇 가지를 변경해야 합니다. final 키워드의 사용은 그 중 하나이지만 여기서 그치지 않습니다.

 

public class ImmutableData {

    private final String someData;

    private final AnotherImmutableData anotherImmutableData;

    public ImmutableData(final String someData, final AnotherImmutableData anotherImmutableData) {

        this.someData = someData;

        this.anotherImmutableData = anotherImmutableData;

    }

    public String getSomeData() {

        return someData;

    }

    public AnotherImmutableData getAnotherImmutableData() {

        return anotherImmutableData;

    }

}

 

public class AnotherImmutableData {

    private final Integer someOtherData;

    public AnotherImmutableData(final Integer someData) {

        this.someOtherData = someData;

    }

    public Integer getSomeOtherData() {

        return someOtherData;

    }

}

  1. 불변 데이터 구조의 모든 필드는 불변이어야 합니다.
  2. 이것은 모든 중첩 유형과 컬렉션 (컬렉션에 포함한 것들까지)에도 적용되야 합니다.
  3. 초기화를 위한 하나 이상의 생성자가 있어야 합니다.
  4. 부작용 없는 접근자 메서드만 있어야 합니다.

항상 정확하게 원칙을 고수하긴 어렵다. 특히 데이터 구조가 복잡해지면 복잡해질수록.. 그러나 여러 외부 라이브러리를 사용하면 Java에서 immutable data를 더 쉽게 사용할 수 있습니다. 예를 들면 Immutables 및 Project Lombok은 Java에서 변경할 수 없는 데이터 구조를 정의하기 위해 도움을 줍니다.

 

 

3.4. 참조 투명성

 

참조 투명성은 아마 이해하기 어려운 함수형 프로그래밍 원칙 중 하나일 것입니다. 그러나 개념은 매우 간단합니다. 함수 외부의 영향을 받지 않는 것입니다.  (= 함수의 결과는 입력 파라미터에만 의존하고, 함수 외 DB, 파일시스템, 원격 URL 등에서 데이터를 읽어 오지 않음 = 외부와 의존적이지 않은 코드 = 동일한 매개변수에 대해서는 항상 동일한 결과 = 예외가 없음) 이를 통해 고차 함수 및 지연 평가와 같은 함수형 프로그래밍에서의 몇 가지 강력한 기술을 사용할 수 있습니다. 이것을 더 잘 이해하기 위해 예를 들어 보겠습니다.

 

public class SimpleData {

    private Logger logger = Logger.getGlobal();

    private String data;

    public String getData() {

        logger.log(Level.INFO, "Get data called for SimpleData");

        return data;

    }

    public SimpleData setData(String data) {

        logger.log(Level.INFO, "Set data called for SimpleData");

        this.data = data;

        return this;

    }

}

이것은 Java의 일반적인 POJO 클래스이지만 이것이 참조 투명성을 제공하는지 확인하는 데 관심이 있습니다. 다음 진술을 관찰합시다.

String data = new SimpleData().setData("Baeldung").getData();

logger.log(Level.INFO, new SimpleData().setData("Baeldung").getData());

logger.log(Level.INFO, data);

logger.log(Level.INFO, "Baeldung");

logger에 대한 세 번의 호출은 의미상 동일하지만 참조적으로 투명하지 않습니다. 첫 번째 호출은 부작용을 생성하므로 참조적으로 투명하지 않습니다. 이 호출을 세 번째 호출에서와 같이 해당 값으로 바꾸면 로그가 누락됩니다.

 

두 번째 호출도 SimpleData가 변경 가능하므로 참조적으로 투명하지 않습니다. 프로그램의 어느 곳에서나 data.setData를 호출하면 해당 값으로 대체되기가 어렵습니다.

따라서 기본적으로 참조 투명성을 위해서는 순수하고 변경할 수 없는 함수가 필요합니다. 이것은 우리가 이미 앞에서 논의한 두 가지 전제 조건입니다. 참조 투명성의 흥미로운 결과로써 우리는 컨텍스트가 없는 코드를 생산합니다. 다시 말해서, 우리는 해당 코드를 어떤 순서와 컨텍스트로든 실행할 수 있으며, 이는 다른 최적화 가능성으로 이어지게 됩니다.

 

 

4. 함수형 프로그래밍 기법

 

앞에서 논의한 함수형 프로그래밍 원칙을 통해 함수형 프로그래밍의 이점을 얻기 위한 여러 기술을 사용할 수 있습니다. 이 섹션에서는 이런 인기 기술 중 일부를 다루고 이를 Java에서 구현하는 방법을 이해해 보겠습니다.

 

4.1. 기능 구성

 

Function Composition은 단순한 함수를 결합하여 복잡한 함수를 구성하는 것을 말합니다. 이것은 실제로 자바의 funcional 인터페이스를 통해 달성됩니다. 

 

일반적으로 단일 추상 메서드가 있는 모든 인터페이스는 functional 인터페이스로 사용할 수 있습니다. 그러므로 우리는 functional 인터페이스를 꽤나 쉽게 정의할 수 있지만 Java 8은 기본적으로 java.util.function 패키지에서 다양한 사용 사례에 대해 많은 기능적 인터페이스를 제공하고 있습니다.

 

이것을 더 잘 이해하기 위해 Function 인터페이스를 예로 들어보면 Function은 하나의 인수를 받아들이고 결과를 생성하는 간단하고 일반적인 기능의 인터페이스입니다.

 

또한 함수 구성에 도움이 되는 두 가지 기본 메서드인 compose 및 andThen을 제공합니다.

 

Function<Double, Double> log = (value) -> Math.log(value);

Function<Double, Double> sqrt = (value) -> Math.sqrt(value);

Function<Double, Double> logThenSqrt = sqrt.compose(log);

logger.log(Level.INFO, String.valueOf(logThenSqrt.apply(3.14)));

// Output: 1.06

Function<Double, Double> sqrtThenLog = sqrt.andThen(log);

logger.log(Level.INFO, String.valueOf(sqrtThenLog.apply(3.14)));

// Output: 0.57

이 두 가지 방법 모두 여러 함수를 단일 함수로 구성할 수 있게 하지만 다른 의미를 제공합니다. compose는 인수에 전달된 함수를 먼저 적용한 다음 호출된 함수를 적용하고 andThen은 역으로 동일한 작업을 수행합니다. 

 

여러 다른 함수형 인터페이스도 funcion composition에 사용될 여러 흥미로운 메소드를 가지고 있는데 Predicate 인터페이스의 기본 페소드 and, or, negate가 있습니다. 이러한 기능적 인터페이스는 단일 인수를 허용하지만 BiFunction 및 BiPredicate와 같은 2개의 인수도 가능하게 합니다. 

 

4.2. 모나드 (추가 리서치 필요 - 이해 X)

 

공식적으로 모나드는 일반적으로 프로그램을 구조화할 수 있는 추상화입니다. 따라서 기본적으로 모나드는 값을 래핑하고, 일련의 변환을 적용하고, 모든 변환이 적용된 값을 돌려 받을 수 있게 합니다. (a monad allows us to wrap a value, apply a set of transformations, and get the value back with all transformations applied.)물론 모든 모나드가 따라야 하는 세 가지 법칙(왼쪽 항등, 오른쪽 항등, 연관성)이 있지만 자세한 내용은 다루지 않겠습니다.

 

Java에는 Optional 및 Stream과 같이 자주 사용하는 몇 가지 모나드가 있습니다.

Optional.of(2).flatMap(f -> Optional.of(3).flatMap(s -> Optional.of(f + s)))

Optional 을 모나드라고 부르는 이유는 무엇일까요? 여기에서 Optional을 사용하면 of를 사용해 값을 래핑하고 flatMap 메서드를 사용하여 다른 래핑된 값을 추가하는 변환을 적용하고 있습니다

 

원한다면 Optional이 모나드의 세 가지 법칙을 따른다는 것을 보여줄 수 있습니다. 그러나 비평가들은 Optional이 어떤 상황에선 모나드 법칙을 어긴다고 지적할 것입니다. 그러나 대부분의 실제 상황에서는 이 정도면 충분합니다.

 

모나드의 기본 사항을 이해한다면 Stream 및 CompletableFuture와 같은 Java에 다른 많은 예제도 있음을 깨닫게 될 것입니다.  우리는 로그(log) 모나드, 보고(report) 모나드 또는 감사(audit) 모나드와 같은 다양한 목표를 달성하기 위해 Java에서 자체 모나드 유형을 정의할 수 있습니다. 함수형 프로그래밍에서 부작용 처리(side-effect)에 대해 논의한 것을 기억하십니까? 모나드는 그것을 달성하기 위한 함수형 프로그래밍 기술 중 하나인 것처럼 보이네요.

 

4.3. 커링 (Currying)

 

Currying은 여러 인수를 취하는 함수를 단일 인수를 취하는 일련의 함수로 변환하는 수학적 기술입니다. 우리는 왜 함수형 프로그래밍에서 이 기술이 필요할까요? 모든 인수를 이용해 함수를 호출할 필요가 없는, 강력한 구성 기술을 제공하기 때문입니다.

 

게다가 커링 함수는 모든 인수를 받을 때까지 그 효과를 깨닫지 못합니다.

Haskell과 같은 순수 함수형 프로그래밍 언어에서는 커링이 잘 지원됩니다. 사실, 모든 함수는 기본적으로 커링이 됩니다. 그러나 Java에서는 그렇게 간단하지 않습니다.

 

Function<Double, Function<Double, Double>> weight = mass -> gravity -> mass * gravity;

 

Function<Double, Double> weightOnEarth = weight.apply(9.81);

logger.log(Level.INFO, "My weight on Earth: " + weightOnEarth.apply(60.0));

 

Function<Double, Double> weightOnMars = weight.apply(3.75);

logger.log(Level.INFO, "My weight on Mars: " + weightOnMars.apply(60.0));

위 코드는 우리는 행성에서 우리의 무게를 계산하는 함수를 정의한 것입니다. 우리의 질량은 동일하게 유지되지만 중력은 우리가 있는 행성에 따라 다릅니다. 특정 행성에 대한 함수를 정의하기 위해 중력만 전달하여 함수를 부분적으로 적용할 수 있습니다. 또한 이 부분적으로 적용된 함수를 임의의 구성에 대한 인수 또는 반환 값으로 전달할 수 있습니다.

 

Currying은 람다 식과 클로저라는 두 가지 기본 기능에 기초를 둡니다. 람다 표현식은 코드를 데이터로 취급하는 데 도움이 되는 익명 함수입니다. 우리는 이전에 기능적 인터페이스를 사용하여 구현하는 방법을 보았습니다.

 

이제 람다 식은 우리가 클로저로 정의하는 어휘 범위에서 닫힐 수 있습니다. 예를 들어 보겠습니다.

private static Function<Double, Double> weightOnEarth() {

    final double gravity = 9.81;

    return mass -> mass * gravity;

}

위의 메서드에서 반환하는 람다 식이 어떻게 우리가 클로저라고 부르는 둘러싸는 변수에 의존하는지 주목하십시오. 다른 함수형 프로그래밍 언어와 달리 Java는 둘러싸는 범위가 최종적이거나 사실상 최종적이어야 한다는 제한이 있습니다.

흥미로운 결과로, 커링을 사용하면 Java에서 임의의 arity의 기능적 인터페이스를 만들 수도 있습니다.

 

from 모던자바인액션


예를 들어, 섭씨를 화씨로 변환하는 공식이 있다고 치자.

CtoF(x) = x*9/5 + 32

 

이것을 메서드로 표현하면

 

static double converter(double x, double f, double b) {

return x * f + b;

}

x는 변환하려는 값, f는 변환요소, b는 기준치 조정 요서인데 온도뿐 아니라 킬로미터와 마일 등의 단위도 변환해야 할 것이다. 기존 로직을 커링이라는 개념을 활용해서 한 개의 인수를 갖는 변환 함수를 생산하는 '팩토리'를 정의해보자

 

static DoubleUnaryOperator curriedConverter(double f, double b) {

return (double x) -> x * f + b;

}

DoubleUnaryOperator converterCtoF = curriedConverter(9.0/5, 32);

DoubleUnaryOperator convertUSDtoGBP = curriedCOnverter(0.6, 0);

DoubleUnaryOperator convertKmtoMi = curriedConverter(0.6214, 0);

double gbp = convertUSDtoGBP.applyAsDouble(1000);

기존의 변환 로직을 재활용하는 유연한 코드를 얻었다. 

참고로, DoubleUnaryOperator는 applyAsDouble 이라는 메서드를 정의한다.


4.4. 재귀

 

재귀는 문제를 작게 나누는데 도움을 주는 좋은 기술입니다. 재귀의 주요 이점은 side-effect를 제거하는 데도 도움됩니다. 

from 모던자바인액션


순수 함수형 프로그래밍 언어에서는 while, for 같은 반복문을 포함하지 않는다. 그런 함수형 프로그래밍의 장점이 분명히 있지만 무조건 반복보다는 재귀가 좋다고 주장은 옳지 않다. 왜냐하면 반복 코드보다 재귀 코드가 더 비싸기 때문, 재귀함수를 호출할 때마다 호출 스택에 각 호출시 생성되는 정보를 저장할 새로운 스택 프레임이 만들어진다. 즉, 재귀 팩토리얼의 입력값에 비례해서 메모리 사용량이 증가한다. 그러면 어떡할까? 함수형 언어에서는 '꼬리 호출 최적화' 라는 해결책을 제공한다. 

 

// 일반 재귀 방식의 팩토리얼

static long factorialRecursive(long n) {

return n == 1 ? 1 : n * factorialRecursive(n-1);

}

// 꼬리 재귀 팩토리얼

static long factorialTailRecursive(long n) {

return factorialHelper(1, n);

}

 

static long factorialHelper(long acc, long n) {

return n == 1 ? acc : factorialHelper(acc * n, n-1);

}

컴파일러가 하나의 스택 프레임을 재활용할 가능성이 생긴다. 재귀 호출 이후 추가적인 연산을 요구하지 않도록 구현하는 것이 핵심이다. 아쉽게도 자바는 이와 같은 최적화를 재공하지 않는다, 하지만 여러 컴파일러 최적화 여지를 남겨둘 수 있는 꼬리재귀를 이용하는 것이 좋다. 


 

5. 함수형 프로그래밍이 왜 중요한가?

 

Java를 포함한 모든 언어에서 함수형 프로그래밍을 채택할 때의 가장 큰 이점은 순수 함수와 불변 상태입니다. 돌이켜 생각해보면 대부분의 프로그래밍 문제는 부작용과 변경 가능한 상태가 원인이 되고 있습니다. 따라서 그것들을 제거하기만 하면 우리 프로그램을 더 쉽게 읽고, 추론하고, 테스트하고, 유지 관리할 수 있습니다.

 

선언적 프로그래밍의 하위 집합인 함수형 프로그래밍은 고차 함수, 함수 구성 및 함수 연결과 같은 여러 구문을 제공합니다. Stream API가 데이터 조작을 처리하기 위해 Java 8에 가져온 이점을 생각해 보십시오.

 

함수형 프로그래밍을 사용하기 시작하기 전에 우리는 프로그램에 대해 함수 측면에서 생각하도록 훈련해야 합니다.

 

 

6. 자바는 함수형 프로그래밍에 적합 할까요?

 

Java에 진정한 함수 타입이 없다는 것은 함수형 프로그래밍의 기본 원칙에 위배됩니다. lamda expression으로 위장한 기능적 인터페이스는 적어도 구문적으로는 대부분 이를 보완하긴 하지만 Java가 기본적으로 mutable하고, immutable 타입으로 변경하려면 너무나도 많은 상용구를 작성해야 하는 것도 사실입니다. 

Java에서 arguments에 대한 기본 전략은 eager이지만 lazy evalauation은 더 효과적이고 함수형 프로그래밍에서 추천되는 전략입니다. 우리는 연산자 단락 및 기능 인터페이스를 사용하여 Java에서 지연 평가를 달성할 수 있지만 더 복잡합니다.

type-erasure, 꼬리호출 최적화 등등이 완성된 형태가 아니기 때문에 Java는 함수형 프로그래밍에서 처음부터 프로그램을 시작하는 데 적합하지는 않습니다.

그러나 이미 Java로 작성된 기존 프로그램이 있는 경우(아마도 객체 지향 프로그래밍으로) 어떻게 될까요? 특히 Java 8에서 함수형 프로그래밍의 이점을 취하는 데에는 문제가 없겠습니다. 

함수형 프로그래밍의 대부분의 이점은 Java 개발자에게 달려있습니다. 객체 지향 프로그래밍과 함수형 프로그래밍의 이점을 결합하면 먼 길도 쉽게 갈 수 있을 것입니다. 

 

 

7. 결론

 

이 튜토리얼에서는 함수형 프로그래밍의 기초를 살펴보았습니다. 우리는 기본 원칙과 Java에서 이를 채택하는 방법을 다루었습니다. 또한 Java의 예제를 사용하여 함수형 프로그래밍에서 몇 가지 인기 있는 기술에 대해 논의했습니다.

마지막으로, 우리는 함수형 프로그래밍을 채택할 때의 이점 중 일부를 다루었고 Java가 이에 적합한지 답했습니다.

 

 

토스에서 진행한 SLASH 22를 보고 Kafka에 대한 호기심이 강하게 들었다. (이름은 많이 들어봤는데 그동안 알아볼 생각을 안했다ㅠ)

현재 팀에서 내가 맡아서 관리해야 될 서버들이 점점 늘어나고 있는데 

여기서 이 서버들이 실제 오픈을 하고 운영 레벨로 넘어가게 되면 모니터링을 하기가 굉장히 어려워질 것임을 예상했다.

현재 팀에는 딱히 모니터링 시스템이라고 할 만한 게 없고 (AWS CloudWatch를 이용하는 정도?)

내가 원하는 것은 어플리케이션 단의 모니터링 시스템이 었기 때문에

이 참에 Kafka를 활용해서 한 번 모니터링 시스템을 구축해보면 어떨까 하는 생각이 들었다. 

일단 그 생각의 실현을 위한 첫 스텝으로 Kafka가 뭔지 알아보려 한다. 

 

 

아래는 최범균 님의 'kafka 조금 아는척하기 시리즈' 유튜브 영상을 보며 정리한 내용입니다. 

 

Kafka 

분산 이벤트 스트리밍 플랫폼

 

 

4개의 구성요소

  • 프로듀서 
    • 메시지(이벤트)를 카프카에 넣는다
  • 컨슈머
    • 메시지(이벤트)를 카프카에서 읽는다.
  • 카프카 클러스터
    • 메시지(이벤트)를 저장한다.
      • 하나의 카프카 클러스터는 여러 개의 브로커로 구성되어 있으며 각각 서버라고 보면 된다. 
      • 브로커는 메시지를 나눠서 저장하고, 이중화 처리도 하고, 장애가 나면 대체도 하는 등의 역할을 수행한다. 
  • 주키퍼 클러스터 (주키퍼 앙상블) 
    • 카프카 클러스터를 관리하는 용으로 클러스터 정보가 저장되어 관리가 됩니다.
    • 브로커가 한 개 밖에 없을 때에도 클러스터로 동작하는데 클러스터 내의 브로커에 대한 분산 처리를 주키퍼가 담당한다. 
      • 주키퍼
        • 분산 시스템에서 시스템 간이 정보 공유, 상태 체크, 서버들 간의 동기화를 위한 락 등을 처리해주는 '분산 코디네이션 시스템'. 카프카에서는 서버의 상태를 감지하기 위해 사용되며 새로운 토픽이 생성되었을 때 토픽의 생성과 소비에 대한 상태를 지정합니다. 

 

토픽과 파티션

  • 토픽
    • 카프카에서 메세지를 저장하는 단위가 토픽 
    • 여러 매세지가 있을 때 이 메세지가 어떤 종류의 메세지인지 구분할 필요가 있는데 이때 사용하는 것이 토픽이다
      • 예를 들어 주문용 토픽, 뉴스용 토픽 같이 각각의 메세지를 알맞게 구분하기 위해 토픽을 사용한다.
      • 파일 시스템의 폴더와 유사하다고 보면 된다. 
    • 한 개의 토픽은 한 개 이상의 파티션으로 구성된다. 
      • 파티션은 메세지를 저장하는 물리적인 파일 
        • 프로듀서와 컨슈머는 토픽을 기준으로 메세지를 주고받는다!
  • 파티션 (= 파일이라고 보면 된다)
    • 파티션은 추가만 가능한 파일이다. 
      • 각 메세지 저장 위치를 오프셋(offset)이라고 한다. 
        • 프로듀서가 카프카에 메세지를 저장하면 저장된 메세지는 offset1, offset2 이렇게 오프셋 값을 가지게 된다.
        • 여러 consumer가 한 topic(일종의 queue 개념)으로부터 여러 번에 걸쳐 메시지를 가져올 수 있습니다. 이런 방식이 가능한 이유는 클라이언트가 해당 queue에서 어느 부분까지 데이터를 받아갔는지 위치를 알려주는 'offset'을 관리하기 때문입니다.
      • 프로듀서가 넣은 메세지는 파티션의 맨 뒤에 추가한다.
      • 컨슈머는 오프셋 기준으로 메세지를 순서대로 읽는다.
      • 메세지는 삭제되지 않는다. (설정에 따라 일정 시간이 지난 뒤 삭제)
    • 한 파티션 내에서만 메세지 순서가 보장된다.

여러 파티션과 프로듀서

  • 프로듀서는 라운드로빈 또는 키로 파티션을 선택한다. 혹은 키를 이용해서 파티션을 선택한다.
    • 프로듀서가 카프카에 메세지를 전송할 때 토픽의 이름 뿐만 아니라 키를 지정할 수 있는데 키가 있는 경우에는 그 키의 해시값을 이용해서 저장할 토픽을 선택할 수 있게 된다. 그래서 같은 키를 갖고 있는 메세지는 같은 파티션에 저장이 된다 (같은 키에 대해서는 메세지 순서가 정해진다)

여러 파티션과 컨슈머 

그룹에 속해있는 컨슈머들이 특정한 파티션을 공유할 수 없다.

  • 컨슈머는 컨슈머 그룹이라는 거에 속하게 되어 있는데 컨슈머가 카프카 브로커에 연결할 때 나는 어떤 그룹에 속해있다고 지정하게 되어있음 
    • 한 개의 파티션은 그룹의 한 개 컨슈머에만 연결이 가능하다. (= 컨슈머 그룹 기준으로 파티션의 메세지가 순서대로 처리되는 것을 보장할 수 있게 된다.)

카프카 성능이 왜 좋을까요?

  • 페이지캐쉬 - 카프카는 파티션 파일에 대해서 OS에서 제공하는 페이지 캐쉬를 이용하기 때문에 파일 IO가 메모리에서 처리되기 때문에 IO속도가 빨라진다.
    • 페이지 캐시란? ( https://medium.com/@tas.com/ )
      • 처리한 데이터를 메인 메모리 영역(RAM)에 저장해서 가지고 있다가, 다시 이 데이터에 대한 접근이 발생하면 disk에서 IO 처리를 하지 않고 메인 메모리 영역의 데이터를 반환하여 처리할 수 있도록 하는 컴포넌트다. 즉 OS가 파일을 read하여 메모리에 올려두고 있다가, 빠르게 접근하여 사용하겠다는 것.
      • 다시 kafka와 내용을 같이 보면, producer가 서버인 broker에게 넣는 데이터는 consumer가 사용하기 전 일정 시간동안 page cache 올려두어, consumer가 데이터를 읽어 갈 때 그 읽어가는 속도를 빠르게 한다는 것으로 이해하면 되겠다.
  • 제로카피 - 디스크에서 데이터를 읽어다가 네트워크로 보내는 속도가 빠르다.
    • 파일에서 소켓으로 데이터를 전송하는 전통적인 과정  -> 비효율 (4개의 사본과 2개의 시스템 호출 )
      1. 운영 체제는 디스크에서 커널 공간의 페이지 캐시로 데이터를 읽습니다.
      2. 응용 프로그램은 커널 공간에서 사용자 공간 버퍼로 데이터를 읽습니다.
      3. 응용 프로그램은 데이터를 다시 커널 공간에 소켓 버퍼에 쓴다.
      4. 운영체제는 소켓 버퍼에서 네트워크를 통해 전송되는 NIC 버퍼로 데이터를 복사한다.
        • Disk > Kernel(PageCache) > User-Space(Buffer) > Kernel(Socket Buffer) > Kernel(NIC Buffer)
    • kafka는 OS가 페이지캐시에서 네트워크로 데이터를 직접 보낼 수 있으므로 위와 같은 재복사가 방지됩니다. 따라서 이 최적화된 경로에서는 NIC 버퍼에 대한 최종 복사본만 필요합니다. 데이터가 메모리에 저장되고 읽을 때마다 사용자 공간으로 복사되는 대신 페이지 캐시에 정확히 한 번 복사되고 소비할 때마다 재사용됩니다.
    • pagecache와 sendfile의 조합 덕분에 Kafka 클러스터에서 디스크가 완전히 캐시에서 데이터를 제공하기 때문에 디스크에서 읽기 활동을 볼 수 없음을 의미합니다.

 

  • 빠르다 - 브로커가 컨슈머에 대해서 할 수 있는 역할이 없어서 상대적으로 빠르다. (제재하지 않고 프로듀서와 컨슈머가 직접 함) 
  • 일괄작업 - 묶어서 보내고 묶어서 받는다. (Batch) 프로듀서와 컨슈머는 일정 크기만큼 메시지를 모아서 전송 그리고 조회가 가능하다. 따라서 낱개로 건건히 보내는 것보다 아무래도 더 빨라질 수밖에 없음 
  • 처리량 조절 쉬움 -  그냥 브로커 추가하고 파티션 추가하거나 컨슈머가 느리다고 생각되면 컨슈머 추가하면 됨.
  • 장애 복구 간단 - 장애가 났을 때 대처하기 위해 리플리카를 사용한다.  (리더/팔로워 구조)
    • 리플리카는 파티션의 복제본으로 복제수만큼 파티션의 복제본이 각 브로커에 생김
      • 하나가 리더 나머지가 팔로워가 되어서 팔로워는 리더로부터 데이터를 읽어와서 저장하므로 리더가 속한 브로커가 장애가 발생하면 이때 다른 팔로워 중에서 하나가 리더가 되고, 프로듀서와 컨슈머는 신규 리더를 통해 메세지를 처리할 수 있게 됩니다. 

프로듀서

 

Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092, kafka01:9092, kafka01:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<Integer, String> producer = new KafkaProducer<>(prop);
producers.send(new ProducerRecord<>("topicname", "key", "value")); // 방법1
producers.send(new ProducerRecord<>("topicname", "value")); // 방법2

producer.close();

KafkaProducer 클래스는 send 메소드를 제공하며 이 send 메소드에 ProducerRecord를 전달하면 됩니다. 

ProducerRecord가 브로커에 전달할 카프카 메세지가 됩니다.

 

프로듀서 내부 동작 흐름

 

(강의에 나온 내용 대사 그대로를 옮겨 적었습니다.)

send() 메소드를 통해 레코드를 전달하면 Serializer를 통해 byte 배열로 변환하고

Partitioner를 이용해 메세지를 어느 토픽의 파티션으로 보낼지 결정합니다.

그리고 변환된 바이트 메시지를 버퍼에 저장하는데 버퍼에 바로 저장하지 않고 배치로 묶어서 저장하게 됩니다.

그리고 sender를 통해 배치를 차례대로 가져와 카프카 브로커로 전송합니다.

  여기서 Sender는 별도 쓰레드로 동작하며 배치가 찼는지 여부에 상관없이 보내며

Sender 쓰레드와는 별개의 쓰레드에서 send 메서드를 통해 메세지를 배치로 모으게 됩니다.

(즉, 메세지를 모으는 쓰레드와 배치를 전송하는 쓰레드는 다릅니다.)

배치하고 sender와 관련된 설정이 처리량에 영향을 주게 됩니다. (batch.size / linger.ms)

batch.size는 배치의 최대 크기를 지정하고 지정한 크기만큼 메세지가 차면 메세지가 바로 전송을 하게 됩니다.

그래서 배치 사이즈가 너무 작으면 한 번에 보낼 수 있는 메세지 크기가 작고, 전송 횟수가 많아 처리량이 떨어지게 되겠죠?!

linger.ms는 센더가 메시지를 보내는 대기 시간입니다.

기본값은 0이며 대기시간을 주게 되면 기다렸다 배치를 전송하기 때문에 한 번에 많은 메세지를 보내게 됩니다. 

 

send() 메소드를 통해 전송한 것은 결과를 확인하지 않습니다. (실패 여부 모름) 따라서 실패에 대한 별도 처리가 필요없는 메시지 전송에 사용합니다. 그런데 실패 여부를 알아야 될 때가 있는데 이 때 두 가지 방법이 사용 가능하다.

 

전송 후 실패 여부를 알고 싶다면

 

  1.   Future 사용 (처리량이 낮아도 정확해야 하는 경우)

get()을 사용하면 블로킹이 되기 때문에 루프를 돌면서 전송하는 경우에는 전송-블로킹-전송-블로킹이라

배치에 메시지가 1개씩만 들어가기 때문에 처리량도 떨어짐

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
	RecordMetadata meta = f.get(); // 블로킹
} catch (ExecutionException ex) {}

 

2.   Callback 사용 

콜백 객체는 전송 후 전송 결과를 onCompletion 메서드로 받게 되는데 Exception을 받게 되면 전송이 실패된 것.

처리량이 떨어지지 않는다. 

producer.sned(new ProducerRecord<>("simple", "value"),
	new Callback() {
    	@Override
        public void onCompletion(RecordMetadata metadata, Exception ex) {
        }
    });

 

전송보장과 ack

 

Producer는 전송을 보장하기 위해 ack값을 제공한다. ack가 0이면 처리량은 많아 지겠지만 메세지 전송 여부는 알 수 없습니다. ack가 1이면 파티션의 리더에 값이 저장되면 성공 응답을 알려줍니다. 따라서 리더에 장애가 발생하면 메세지가 유실될 가능성이 있습니다. (팔로워에 저장이 아직 안됐는데 성공 응답을 내려주었고, 이 상태로 리더에 장애가 발생하는 경우가 이에 해당된다) ack가 all 이면 모든 팔로워에 다 저장이 되었을 때 응답을 내려줍니다. 따라서 메시지 유실이 없어야 되는 경우에는 all로 주는 것이 맞다고 볼 수 있다. 

 

+ 전송하다 에러가 나는 경우 재시도가 가능한 경우에는 kafka에서 재시도를 수행한다. 

+ enable.idempotence 속성을 사용하면 메시지 중복 전송 가능성을 낮출 수 있다. 

 

재시도와 순서

재시도의 주의 사항은 중복 전송과 순서가 바뀐다는 것이다.

  • max.in.flight.requestes.per.connection 옵션
    • 블록킹 없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수

이 옵션값이 1보다 크게 되면 재시도가 언제 이뤄지냐에 따라 메시지 순서가 바뀔 수 있다. 

따라서, 전송 순서가 중요하면 이 값을 1로 지정해야 한다. 

 

 


 

컨슈머

 

토픽 파트션에서 특정 레코드 조회

Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "group1");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDesrializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);
consumer.subscribe(Collections.singleton("simple")); // 토픽 구독
while (조건) {
	ConsumerRecords<String, String> recoreds = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record: records) {
    	System.out.println(record.value() + ":" + record.topic() + ":" + 
        	record.partition() + ":" + record.offset());
    }
}

consumer.close();

토픽 파티션은 그룹 단위로 할당된다. 

각 컨슈머가 파티션에 연결되는데 파티션보다 컨슈머가 더 많이 생기면 이후로 생기는 컨슈머는 놀게 된다. 특정 파티션에 연결될 수 없기 때문에.. 그래서 컨슈머 개수가 파티션 개수보다 커지면 안되고, 처리량이 떨어져서 컨슈머가 커져야 되면 파티션 개수도 함께 늘려야 합니다.

 

커밋과 오프셋

컨슈머의 poll 메소드는 이전에 커밋한 오프셋이 있으면 그 오프셋 이후의 레코드를 읽어 옵니다. 

만약에 poll 메소드로 레코드를 읽어오려는데 커밋된 레코드가 없는 경우에는 auto.offset.reset 옵션 설정값을 사용합니다.

 

auto.offset.reset

  • earliest : 맨 처음 오프셋 사용
  • latest : 가장 마지막 오프셋 사용
  • none :  익셉션 발생 

컨슈머 설정

조회에 영향을 주는 주요 설정

  • fetch.min.bytes
    • 조회시 브로커가 전송할 최소 데이터 크기
      • 기본값은 1이며 이게 크면 대기 시간이 늘지만 처리량은 올라감
  • fetch.max.wait.ms
    • 데이터가 최소 크기가 될 때까지 기다릴 시간 
      • 기본값은 500(0.5초)이며 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름
  • max.partition.fetch.bytes
    • 파티션 당 서버(브로커)가 리턴할 수 있는 최대 크기 
      • 기본값은 1MB

자동 커밋 / 수동 커밋

enable.auto.commit 설정을 통해 자동 커밋할지 수동 커밋을 할지 결정한다. 

true이면 일정 주기로 컨슈머가 오프셋을 커밋하고, false면 수동으로 진행되게 된다.

자동 커밋은 poll(), close() 메서드 호출시 자동으로 실행된다.

 

수동커밋 방법 

  1. 동기

ConsumerRecord<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
	처리
}
try {
	consumer.commitSync();
} catch (Exception ex) {
	// 커밋 실패시 에러 발생
    // 실패하면 알맞은 처리 하면 됨
}

  2. 비동기

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
	..처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)

비동기이기 때문에 코드 자체에서 실패 여부를 바로 알 수가 없고, 알고 싶으면 콜백을 받아서 처리해야 한다. 

 

컨슈머가 동일한 메시지를 읽어올 수 있다. 커밋이 실패했다거나 컨슈머가 추가 되는 케이스에서 리밸런싱이 일어나고, 이 과정에서 동일한 메시지를 읽어올 수 있다. 이때문에 멱등성(idempotence - 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질)을 고려해야 한다. 그리고 데이터 특성에 따라 타임스탬프, 일련 번호 등을 활용해야 한다. 

 

 

세션 타임아웃, 하트비트, 최대 poll 간격

컨슈머는 하트비트를 계속 브로커에 전송해서 연결을 유지한다. 브로커는 일정 시간 동안 컨슈머를 통해 하트비트를 전달받지 못하면 그룹에서 빼버린다. 그리고 리밸런싱을 진행한다. 

관련 설정은 

  • session.timeout.ms : 세션 타임 아웃 시간 (기본값 10초)
  • hearbeat.interval.ms : 하트비트 전송 주기 (기본값 3초)
    • session.timeout.ms의 1/3 이하 추천
  • max.poll.interval.ms : 정해진 시간 동안 poll()하지 않으면 컨슈머를 그룹에서 빼고 리밸런스를 진행한다. 

 

세션 종료

보통은 무한 루프를 돌면서 poll() 메서드로 레코드를 불러온느 코드를 작성하게 되는데 이 loop를 어떻게 벗어날 수 있을까! 바로 wakeup() 메서드를 호출한다. finally에서 consumer.close()를 해주자~

 

KafkaConsumer는 쓰레드에 안전하지 않기 때문에 여러 쓰레드에서 동시에 사용하지 않아야 한다. 

 


추가 내용

 

주키퍼 

• 주키퍼 사용용도

주키퍼는 클러스터에서 구성 서버들끼리 공유되는 데이터를 유지하거나 어떤 연산을 조율하기 위해 주로 사용

  • 설정 관리 : 클러스터의 설정 정보를 최신으로 유지하기 위한 조율 시스템으로 사용됩니다.
  • 클러스터 관리 : 클러스터의 서버가 추가되거나 제외될 때 그 정보를 클러스터 안 서버들이 공유하는 데 사용됩니다.
  • 리더 채택: 다중 어플리케이션 중에서 어떤 노드를 리더로 선출할 지를 정하는 로직을 만드는 데 사용됩니다. 주로 복제된 여러 노드 중 연산이 이루어지는 하나의 노드를 택하는 데 사용됩니다.
  • 락, 동기화 서비스 : 클러스터에 쓰기 연산이 빈번할 경우 경쟁상태에 들어갈 가능성이 커집니다. 이는 데이터 불일치를 발생시킵니다. 이 때, 클러스터 전체를 대상을 동기화해( 락을 검 ) 경쟁상태에 들어갈 경우를 사전에 방지합니다.

 

Kafka vs RabbitMQ

둘 다 pub/sub 기반의 메시지 큐 서비스인데 Kafka는 이벤트 브로커이고, RabbitMQ는 메세지 브로커이다. 

이벤트 브로커는 메세지 브로커의 기능을 포함하는 더 큰 범위의 개념이기에 이벤트 브로커가 메세지 브로커 역할을 수행할 수도 있다.

메세지 브로커는 중간 다리 역할을 수행하는 broker로 publisher가 생산한 메세지를 큐에 저장하고, consumer가 데이터를 가져가면 즉시 혹은 짧은 시간 내에 큐에서 데이터를 삭제한다. 보통 서로 다른 시스템 사이에서 데이터를 비동기 형태로 처리하고 싶을 때 사용하며 AWS에서는 비슷하게 SQS가 있다. 

 

이벤트 브로커는 publisher가 생산한 이벤트를 저장하고,  consumer가 해당 이벤트를 사용하더라도 이벤트가 저장된다는 특징으로 이후에 다시 재사용 할 수 있는 장점을 가지고 있다. 

 

(https://www.cloudamqp.com/blog/when-to-use-rabbitmq-or-apache-kafka.html)

RabbitMQ 사용 사례 
  • 일반적으로 단순/전통적인 pub-sub 메시지 브로커를 원하는 경우 확실한 선택은 RabbitMQ입니다. 요구 사항이 channels/queues을 통한 시스템 통신을 처리할 만큼 간단하고, 메세지를 보존하거나 스트리밍을 요구하는게 아닌 경우에 말이다. 
  • 두 개의 주요 사용 사례로 나눌 수 있다.
    • LONG-RUNNING TASKS
      • RabbitMQ를 사용하는데 오래 걸리는 작업이 백그라운드에서 안정적으로 실행되어야 할 때
    • MIDDLEMAN IN A MICROSERVICE ARCHITECTURES
      • 애플리케이션 내부 및 애플리케이션 간의 통신 및 통합을 위한 경우
      • 마이크로서비스 간의 중개자로서 시스템에게 단순히 작업을 실행하라는 것을 알릴 때 예를 들면 주문 처리나 주문 상태 업데이트 같은 경우다. 

Apache Kafka 사용 사례

  • 일반적으로 스트리밍 데이터를 저장, 읽기(다시 읽기), 분석하기 위한 프레임워크를 원한다면 Apache Kafka를 사용합니다. 감사를 받거나 메시지를 영구적으로 저장해야 하는 시스템에 이상적입니다.
  • 두 개의 주요 사용 사례로 나눌 수 있다.
    • DATA ANALYSIS (추적, 수집, 로깅, 보안 등)
      • 데이터를 분석해서 Insights를 얻고, 수많은 데이터에 대한 감사 또는 분석이 필요한 경우.
      • 주요 분석, 검색 및 저장 시스템
    • 실시간 처리
      • 처리량이 많은 분산 시스템 역할을 합니다. 소스 서비스는 데이터 스트림을 실시간으로 가져오는 대상 서비스로 푸시
      • Kafka는 적은 수의 소비자와 실시간으로 많은 생산자를 처리하는 시스템에서 사용할 수 있습니다. 즉, 주식 데이터를 모니터링하는 금융 IT 시스템.

 

  RabbitMQ Apache Kafka
무엇인가? 견고하고 성숙한 범용 메시지 브로커 높은 유입 데이터 스트림 및 재생에 최적화된 Message Bus
주요 용도 애플리케이션 내부 및 애플리케이션 간의 통신 및 통합으로 기 실행 작업 또는 안정적인 백그라운드 작업을 실행해야 하는 경우 스트리밍 데이터의 저장, 읽기(다시 읽기) 및 분석
메세지 지속성 수신 확인 시 삭제 보존 기간 옵션에 따라 메세지 유지
(수신 되어도 삭제하지 않음)
라우팅 소비자 노드에 정보를 반환할 수 있는 유연한 라우팅 지원 유연한 라우팅을 지원하지 않으며 별도의 주제를 통해 수행해야 합니다.
메시지 우선순위 지원 지원하지 않음
 
 

 

 

 

출처

https://www.youtube.com/watch?v=0Ssx7jJJADI (최범균 님의 kafka 조금 아는척하기 시리즈)

https://epicdevs.com/17

https://m.blog.naver.com/kgw1988/221212827363 (카프카에서의 데이터 저장 방식)

https://kafka.apache.org/documentation/#gettingStarted 공식문서

 https://programacion.tistory.com/156 [KA's Regalo:티스토리]

https://www.cloudamqp.com/blog/when-to-use-rabbitmq-or-apache-kafka.html

+ Recent posts