정확하게 표현하자면, 데이터 방출이 "재실행"됨
데이터를 observable 안쪽에서 생성하면 두 개의 구독이 서로 다른 값을 받게 됨
import * as Rx from "rxjs";
const observable = Rx.Observable.create((observer) => {
observer.next(Math.random());
});
// subscription 1
observable.subscribe((data) => {
console.log(data); // 0.24957144215097515 (random number)
});
// subscription 2
observable.subscribe((data) => {
console.log(data); // 0.004617340049055896 (random number)
});
데이터를 observable 바깥에서 생성하면 두 개의 구독이 같은 값을 받게 됨
import * as Rx from "rxjs";
const random = Math.random()
const observable = Rx.Observable.create((observer) => {
observer.next(random);
});
// subscription 1
observable.subscribe((data) => {
console.log(data); // 0.11208711666917925 (random number)
});
// subscription 2
observable.subscribe((data) => {
console.log(data); // 0.11208711666917925 (random number)
});
import * as Rx from "rxjs";
const observable = Rx.Observable.fromEvent(document, 'click');
// subscription 1
observable.subscribe((event) => {
console.log(event.clientX); // x position of click
});
// subscription 2
observable.subscribe((event) => {
console.log(event.clientY); // y position of click
});
cold observerable을 hot observable로 변환하는 방법은 여러가지가 있는데, 아주 간단하게 위에서 random값의 생성 위치를 바꾼 예제와 같은 방법이 있긴 하지만 현실적이진 않음. 일반적인 변환방법이 있는데, ConnectableObservable을 이용하는 방법이 있고, Subject를 이용하는 방법이 있음
publish()를 호출하여 observable을 ConnectableObservable로 변환할 수 있음
Observable<Long> myObservable = Observable.interval(1, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableObservable = myObservable.publish();
connectableObservable.subscribe(Log::d);
Thread.sleep(5000);
이 코드는 subscribe()를 호출했기 때문에 아무 값도 방출되지 않음. 아래 코드는 connect()를 추가로 호출했기 때문에 데이터가 방출됨
다른 observer들을 단일한 observable source로 연결해주는 객체
subscribe()를 호출해도 방출하지 않지만 observer 중 하나라도 connect()를 호출하면 방출을 시작함
그렇기 때문에 여러 observer들이 하나의 observable의 값을 공유하게 되는 효과가 생김
Observable<Long> myObservable = Observable.interval(1, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableObservable = myObservable.publish();
connectableObservable.subscribe(item -> Log.d("Observer 1: " + item));
connectableObservable.connect();
Thread.sleep(3000);
connectableObservable.subscribe(item -> Log.d("Observer 2: " + item));
Thread.sleep(5000);
두 개의 구독자가 다른 시각에 구독을 시작했을 때 구독 시점부터 같은 데이터를 받을 수 있도록 처리해 줌
connect()가 방출을 시작하게 하는 메소드라면, 더 이상 구독자가 없을 때 방출이 불필요해지므로 종료시키는 메소드가 필요한데 바로 refCount()가 이 역할을 해 줌. observable을 publish()한 다음에 refCount()를 걸어주는 코드을 참고할 것.
Observable<Long> myObservable = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> hotObservable = myObservable.publish().refCount();
Disposable subscription1 = hotObservable
.doOnSubscribe(d -> Log.d("Observer 1 subscribed"))
.doFinally(() -> Log.d("Observer 1 unsubscribed"))
.subscribe(item -> Log.d("Observer 1: " + item));
Thread.sleep(3000);
Disposable subscription2 = hotObservable
.doOnSubscribe(d -> Log.d("Observer 2 subscribed"))
.doFinally(() -> Log.d("Observer 2 unsubscribed"))
.subscribe(item -> Log.d("Observer 2: " + item));
Thread.sleep(3000);
subscription1.dispose();
subscription2.dispose();
hotObservable
.doOnSubscribe(d -> Log.d("Observer 3 subscribed"))
.doFinally(() -> Log.d("Observer 3 unsubscribed"))
.subscribe(item -> Log.d("Observer 3: " + item));
Thread.sleep(3000);
subject는 bridge나 proxy 역할을 하는 객체임
source observable(cold라도 괜찮음)을 바라보는 subject를 가지고 있으면 hot observable이 됨
source observable과 구독자 사이에 subject를 두면, subject만 source를 구독하고, subscriber들은 subject를 구독함으로써 source의 값을 여러 subscriber들이 공유할 수 있게 함(뜨거워졌음)
RxJava에서의 Subject는 Observable과 Observer 양쪽의 속성(메소드)을 가지는 클래스임. io.reactivex 패키지에 다음과 같이 정의되어 있음
public abstract class Subject<T>
extends Observable<T>
implements Observer<T>
Subject 클래스의 하위 클래스에 대한 자세한 내용은 다음 문서를 참고할 것
프로그래밍 언어마다 차이가 있을 수 있음
RxJS에서는 observable이 데이터 생성자를 포함해야 하므로 팩토리 메소드를 구현해주는 방법을 사용함
const observableFactory = () => websocket();
createColdObservable<T>(observableFactory: () => Observable<T>): Observable<T> {
return new Observable((subscriber) => {
const subscription = observableFactory().subscribe(subscriber);
return () => {
subscription.unsubscribe();
};
});
}
일반적으로는 defer()를 이용해서 구독 시점까지 방출을 지연시키는 방법이 있음