Reactive Stream

본격적으로 Reactive Stream 에 대해 알아보겠다.

(by java.util.concurrent.Flow)

 

Publisher

Reactive Stream 의 Publisher 이다.

- publisher 는 데이터 혹은 이벤트(메시지)를 제공한다.

 

subscribe

- publisher 에 subscriber 를 등록할 수 있다. (다수를 등록할 수 도 있다.)

 

 

Subscriber

Reactive Stream 의 Subscriber 이다.

Subscriber 는 Publisher 로 부터 다양한 이벤트가 들어오는 채널을 구현해야한다.

 

참고

publisher 에서 subscriber 에게 데이터 혹은 이벤트를 전달하는 방식은 push 방식이다.

 

onSubscribe

- publisher 에서 호출하는 메서드이다. (채널)

- publisher 는 Subscription 을 subscriber 에게 전달한다.

- onSubscribe 라는 이벤트가 전달되는 채널

 

onNext

- publisher 에서 호출하는 메서드이다. (채널)

- 해당 채널을 통해 값을 전달 받는다.

- onNext 라는 이벤트가 전달되는 채널

- 다른 이벤트와 다르게 여러번 전달될 수 있다.

 

onError

- publisher 에서 호출하는 메서드이다. (채널)

- 해당 채널을 통해 Publisher 에서 발생된 에러를 전달 받는다.

- onError 라는 이벤트가 전달되는 채널

 

onComplete

- publisher 에서 호출하는 메서드이다. (채널)

- 해당 채널을 통해 Publisher 에서 발생된 완료 이벤트를 전달 받는다.

- onComplete 라는 이벤트가 전달되는 채널

 

 

Subscription

Reactive Stream 의 Subscription 이다.

Subscriber 는 Subscription 으로 Publisher 가 전달하는 이벤트를 조절할 수 있다.

즉, back-pressure 개념을 사용할 수 있다. 또한, 취소도 가능하다.

 

request

- subscriber 가 publisher 로 부터 데이터 몇개를 받을 건지 정한다.

 

cancel

- subscriber 가 publisher 로 부터 더이상의 이벤트를 받지 않겠다고 말함.

 

 

흐름

이제 전체적인 흐름을 알아보겠다.

 

아래에서 메서드 호출은 메시지(이벤트) 전달이라 봐도 무방하다.

 

1. subscribe

- Publisher 의 subscribe 메서드를 호출하여 Subscriber 를 등록한다.

 

2. onSubscribe

- Publisher 가 Subscriber 의 onSubscribe 를 호출하여 Subscription 을 넘겨준다.

 

3. request(n) / cancel

- request(n) 을 통하여 Publisher 에게 데이터를 n 개 요청한다.

- cancel 을 통하여 Publisher 에게 더이상 이벤트를 받지 않겠다고 한다.

 

4. onNext(data)

- Publisher 가 n 개의 데이터를 한개씩 Subscriber 의 onNext 로 넘겨준다.

 

5. onComplete / onError

- Publisher 가 데이터를 모두 전달하면 subscriber 의 onComplete 를 호출한다.

- Publisher 에서 에러가 발생하면 subscriber 의 onError 를 호출한다.

 

 

이제 다시..

Reactive Programming 포스팅에서 했던 가정을 여기에도 적용해보자..

 

Publisher 를 리턴 타입으로 하는 함수를 생각하겠다.

 

Caller 입장에서 생각해보자.

Caller 는 Callee 로 부터 Publisher 를 받았다.

Publisher 타입으로 받아서

중간 파이프라인을 붙이거나 혹은 onNext 를 구현함으로써

"특정 작업"을 처리할 수 있다.

 

즉,

1. Publisher 에서 onNext 를 호출하기 때문에 (push 모델)

"특정 작업" 의 실행을 Caller 에서 Publisher 로 넘겼다. (비동기)

(Publisher 는 Callee, Subscriber 는 Caller 가 아닐 수 있다.)

-> 좀 더 풀어서 설명하자면..

Callee 는 Publisher 를 반환하고 Caller 는 Subscriber 를 등록한다.

이후, Publisher 가 Subscriber 로 이벤트를 Push 해주기 때문에

(특정 작업은 Publisher 실행 스레드 혹은 제 3의 스레드 일 수 있음)

Caller 기준에서 특정 작업은 비동기로 동작하게 된 것이다.

 

2. Publisher 는 그 자체로 Stream 이며

Subscription 으로 back-pressure 도 조절 가능하다.

 

 

Reactive Stream 의 구현 라이브러리

Reactive Stream 의 기본 구조를 살펴보았다.

위와 같은 구조에 강력한 중간 파이프라인(?) 연산자들을 제공하며

Scheduler, 지연 실행 등 Stream 과 CompletableFuture 에서 해결 못 한 것들 뿐만 아니라

다양한 기능을 제공하는게 Reactive Stream 의 구현 라이브러리이다..

 

대표적으로 3가지가 존재한다.

 

1. project reactor

- Spring Webflux 에서 사용한다.

- 유명한 Mono, Flux 가 여기에 해당한다.

 

2. RxJava

- netflix 에서 개발한 라이브러리로.. 가장 먼저 나왔다.

 

3. Mutiny

- Redhat 에서 개발하였다.