차가운것을 뜨겁게 바꿔주는 Subject 클래스 (AsyncSubject)

2018. 12. 10. 19:21리액티브 프로그래밍/RxJava


Subject클래스 (AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등 )


AsyncSubject 클래스

Observable에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 클래스 입니다.

완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시합니다.

AsyncSubject 클래스의 마블다이어 그램입니다.

이 다이어 그램을 보면, 방금 위에서 설명한 마지막 데이터에만 관심이 있다는 말이 쉽게 와 닿습니다.

처음 구독자가 subscribe() 함수를 호출합니다.

후에 첫번째 원, 두번째 원이 발행된 후, 두번째 구독자가 subscribe() 함수를 호출합니다.

마지막으로 세번째 원이 발행된후, 데이터 발행을 완료하는 onComplete 이벤트 발생합니다.

이때 완료되기 전까지는 구독자들에게 데이터 전달을 하지 않다가, 완료됨과 동시에 첫번째와 두번째 구독자엑 ㅔ마지막 데이터를 발행하고 종료합니다.


AsyncSubject subject = AsyncSubject.create();
subject.subscribe(data->System.out.println("Subscriber #1 => " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data->System.out.println("Subscriber #2 => " + data));
subject.onNext("5");
subject.onComplete();

위 코드가 위에서 설명한 마블다이어그램와 일치하는 RxJava 코드이다.

subscribe()메소드로 구독자가 구독을 했을경우, 원래라면 바로 결과가 출력이 되어야 한다. 하지만 AsyncSubject 객체의 경우 onComplete() 메소드가 실행될 때 까지 결과를 보류 시킨다.

그리고 세번째 데이터가 발행된 후에 onComplete() 메소드가 실행되고 나서야 subscribe() 메소드의 결과를 나타낸다.

따라서 위 코드의 결과는 아래와 같다.

Subscriber #1 => 5

Subscriber #2 => 5


AsyncSubject 클래스는 구독자로도 동작할 수 있습니다. 아래 예제를 확인해 봅시다.

package reactivejava;

import io.reactivex.Observable;
import io.reactivex.subjects.AsyncSubject;

public class FirstExample {
	
	public void emit() {
		Float[] temperature = {10.1f, 13.4f, 12.5f };
		Observable source = Observable.fromArray(temperature);

AsyncSubject subject = AsyncSubject.create(); subject.subscribe(data -> System.out.println("Subscriber #1 => " + data)); source.subscribe(subject); } public static void main(String[] args) { FirstExample demo = new FirstExample(); demo.emit(); } }

먼저 Float 타입을 담는 배열을 생성합니다. 그리고 그 배열을 담는 Observable을 생성합니다.

subject 변수에 AsyncSubject 객체를 생성하고 data를 수신할 수 있도록 subscribe()함수를 호출합니다.

마지막으로 subject 변수는 Observable인 source를 구독합니다.


쉽게 말해 배열을 담고있는 Observable 객체를 만들고,   data를 받아 출력하는 AsyncSubject 객체를 생성후.

Observable 을 AsyncSubject 객체로 구독한 경우입니다.


결과는 Float배열의 마지막 원소인 12.5가 data로 받아져

Subcriber #1 => 12.5

로 나타나 집니다.


마지막으로 AsyncSubject에 클래스에서 onComplete()함수를 호출한 후에 구독할 때를 살펴보겠습니다.





package reactivejava;

import io.reactivex.Observable;
import io.reactivex.subjects.AsyncSubject;

public class FirstExample {
	
	public void emit() {
		AsyncSubject subject = AsyncSubject.create();
		subject.onNext(10);
		subject.onNext(11);
		subject.subscribe(data->System.out.println("Subscriber #1 => " + data));
		subject.onNext(12);
		subject.onComplete();
		subject.onNext(13);
		subject.subscribe(data->System.out.println("Subscriber #2 => " + data));
		subject.subscribe(data->System.out.println("Subscriber #3 => " + data));
	}
	
	public static void main(String[] args) {
		FirstExample demo = new FirstExample();
		demo.emit();
	}
}

10, 11 이라는 데이터 발행후 첫 구독자가 subscribe() 호출하고, 12 라는 데이터 발행 후 onComplete() 함수 호출 했습니다. Observable과 마찬가지로 onComplete()함수 호출 이후에는 onNext 이벤트를 무시합니다. 그 다음 ㄷ붠째와 세번째 구독자가 subscribe()함수를 호출했습니다.

이 예제의 실행 결과는 예상했던데로,

Subscriber #1 => 12

Subscriber #2 => 12

Subscriber #3 => 12


입니다.


다음은 BehaviorSubject 클래스 입니다. 다음 글에서 이어 가겠습니다.


'리액티브 프로그래밍 > RxJava' 카테고리의 다른 글

뜨거운 Observable  (0) 2018.12.10
RxJava 처음 시작하기(2)  (0) 2018.12.10
RxJava 처음 시작하기(1)  (0) 2018.12.07