Point 1. subscribeOn 은 업스트림과 다운스트림에, observeOn 은 다운스트림에 영향을 준다는 점을 기억한다.

Point 2. 하나의 파이프라인에는 하나의 쓰레드만 작동한다. (flatMap 을 통한 병렬화를 하지 않을 경우.)

Point 3. 워커 쓰레드가 요소를 처리 중일 때 다음 요소가 들어오면 이 요소는 queuing되고, 현재 요소의 처리가 끝나면 queuing된 다음 요소를 처리한다.

Point 4. 처리 지연에 대비하여 별도 스케쥴러 워커에 실행시킬 로직이 있고, observeOn 을 사용하려 한다면 이 로직은 반드시 observeOn 다음에 위치시켜야 한다.

 

 

스케쥴러 쓰레드 기아 현상은 다음과 같은 경우 발생한다:

// 아래 두 source는 1초에 한 번 Item을 발행하는 Hot Observable 이라 가정한다.
Observable<Item> source1 = ... // observeOn 스케쥴링이 걸린 소스 1 (워커A)
Observable<Item> source2 = ... // observeOn 스케쥴링이 걸린 소스 2 (워커B)

Observable.merge(source1, source2)
		.subscribe(item -> {
        	... // 1초 넘는 지연 발생
		});

위 코드에서 작업을 수행하는 쓰레드는 워커A, 워커B 두 쓰레드이다. observeOn은 다운스트림으로 subscribe까지 영향을 미치므로, 위 코드에서 merge 의 subscribe는 Item이 방출된 워커 쓰레드가 수행하려 한다. (source1에서 나온 Item은 워커A가, source2에서 나온 Item은 워커B가.)

 

그런데 subscribe에서 지연이 발생할 경우 이야기는 달라지고, 위 코드는 다음과 같은 결과를 낳는다. (워커의 순서는 무관하며, Item 구분을 위해 워커A의 아이템은 ItemA-{순서}, 워커B의 아이템은 ItemB-{순서} 라 칭한다.)

 

1. 워커A가 ItemA-1을 방출하고 subscribe 안의 무거운 로직을 처리하러 들어간다.

2. 워커B가 ItemB-1을 방출하고 subscribe 으로 Item을 보낸다. 이 때 워커A가 이미 ItemA-1을 처리하는 중이기 때문에 ItemB-1은 queuing된다.

3. 워커A는 ItemA-1의 처리를 마치고 queuing되었던 ItemB-1 처리를 시작한다.

4. 워커B가 ItemB-2을 방출한다. 워커A는 워커B의 첫 번째 ItemB-1을 처리하는 중이기 때문에 이 ItemB-2는 queuing된다.

5. 워커A는 ItemB-1의 처리를 마치고 ItemB-2 처리를 시작한다.

6. 워커B는 ItemB-3를 방출한다. 워커A는 ItemB-2를 처리하는 중이기 때문에 ItemB-3는 queuing된다.

7. 워커A는 ItemB-2의 처리를 마치고 ItemB-3 처리를 시작한다.

6. 워커B는 ItemB-4를 방출한다. 워커A는 ItemB-3를 처리하는 중이기 때문에 ItemB-4는 queuing된다.

...

 

위와 같이 워커A는 merged Observable로 queuing되는 Item을 처리하기 위해 계속 subscribe안에 머물게 된다. 문제는 source Observable이 Hot Observable이라는 것이다. 워커B가 ItemB-*를 계속해서 내보내고 queuing시키는 동안 ItemA-*또한 발생할 수 있는데, 워커A가 merged Observable에서 ItemB-*를 처리하느라 ItemA-*를 핸들링하지 못한다. merged Observable에 먼저 진입한 워커 쓰레드는 갇히게 되는 것이다.

 

Hot Observable이 아니더라도 이 현상은 똑같이 발생할 수 있는데, Cold Observable의 경우 모든 ItemB-* 처리를 마치고 나서야 ItemA-* 핸들링을 시작하게 된다. Hot Observable은 무한히 갇히게 된다는 점, Cold Observable은 마치 concatated Observable (Observable.concat(...)) 처럼, 하나의 source를 완전히 마치기 전까지 다른 source를 처리하지 못한다. 이는 merge 를 사용한 의도에 부합되지 않는다.

 

이 현상을 방지하기 위해서는 merged Observable의 로직 처리 또한 다른 워커 쓰레드에게 맡겨야 한다. 그 방법은 다양하겠지만, 대충 아래와 같이 실현할 수 있다:

...
Observable.merge(source1, source2)
		.observeOn(Schedulers.io())
		.subscribe(item -> {
        	... // 1초 넘는 지연 발생
		});

 

 

ConnectableObservable을 사용하여 One publicher-Many subscribers를 구현할 때, onError* 사용 시 subscriber마다 서로 다른 에러 처리 로직을 사용할 수 있다.

 

이 때, publisher 단계에서 에러가 발생하면 원치 않는 Subscriber의 에러 처리 코드가 실행될 수 있음을 유의하여야 한다.

 

아래와 같이 한 Publisher에 세 개의 Subscriber가 있을 때, observer 마다 서로 다른 에러 처리 로직을 가지고 있고, 자신만의 방법으로 에러를 처리하기를 원할 수 있다

 

source (ConnectableObservable) <- observer1, observer2, observer3

 

이 때 source 에서 에러가 발생하면 이 에러는 observer의 onError* 로 흐르게 된다. 이를 원치 않는다면, observer에서 flatMap-Observable.just를 사용하여 별도의 내부 파이프라인을 정의해야 한다.

RxJava2 에서, 하나의 Observable에는 하나의 쓰레드만 작동한다. 파이프라인에 스케쥴러를 사용해도 요소가 병렬로 처리되지는 않는다.

 

예로, 아래 코드를 보자:

...
Observable.create(new ObserveOnSubscribe<Item>() {
	@Override
    public void subscribe(ObservableEmitter<Item> emitter) throws Exception {
    	while (true) {
        	emitter.onNext(/* Item */);
            Thread.sleep(1000);
        }
    }
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Item>() {
	@Override
    public void accept(Item item) throws Exception {
    	Thread.sleep(3000);    
    }
})
...

source는 1초에 한 번 Item을 발행한다. Schedulers.io()를 사용하였으므로 doOnNext에서 수행하는 로직이 1초를 초과해도 source의 Item 발행이 지연되지는 않는다.

 

이 때 사용자는 doOnNext의 수행이 1초를 초과하여 한 Item을 처리하기 전에 다음 Item이 발행되면 io 스케쥴러의 또다른 쓰레드가 작동하여 별도로 doOnNext로직을 수행할 거라고 생각할 수 있는데, RxJava2에서 한 Observable은 동시에 하나의 쓰레드만이 작동하기 때문에, doOnNext의 처리가 지연되어도 Item은 보이지 않게 계속 쌓이고, 현재 Item이 완전히 처리되어야 다음 Item을 처리한다.

 

위 코드에서 doOnNext는 3초의 처리 시간을 갖는다. 위 코드는 다음과 같은 흐름으로 실행된다.

Source 쓰레드] 1번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(1번 아이템) 
--1초 경과--
Source 쓰레드] 2번 Item 발행
--1초 경과--
Source 쓰레드] 3번 Item 발행
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(1번 아이템)
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(2번 아이템) 
Source 쓰레드] 4번 Item 발행
--1초 경과--
Source 쓰레드] 5번 Item 발행
--1초 경과--
Source 쓰레드] 6번 Item 발행
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(2번 아이템)
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(3번 아이템) 
Source 쓰레드] 7번 Item 발행
--1초 경과--
Source 쓰레드] 8번 Item 발행
--1초 경과--
Source 쓰레드] 9번 Item 발행
...

물론 실제로 돌려보면 중간 경과 시간은 정확히 1초가 아닐 것이고, doOnNext 시작/종료와 Item 발행은 순서가 뒤바뀔 수 있다. 여튼 위처럼, 처리 쓰레드는 한 Item을 처리하는 데에 3초가 걸리고, Item은 1초에 하나씩 발행된다.

 

이처럼, observeOn은 처리 쓰레드를 달리 하는 것이지, 병렬 처리는 완전 별개의 문제다. Item 처리가 지연될 때 다른 쓰레드가 다음 Item을 처리하기를 원하면 다음과 같이 해야한다:

...
Observable.create(new ObserveOnSubscribe<Item>() {
	@Override
    public void subscribe(ObservableEmitter<Item> emitter) throws Exception {
    	while (true) {
        	emitter.onNext(/* Item */);
            Thread.sleep(1000);
        }
    }
})
.flatMap(new Function<Item, Observable<Item>>() {
	@Override
    public Observable<Item> apply(Item item) throws Exception {
    	return Observable.just(item)
			.observeOn(Schedulers.io())
            .doOnNext(new Consumer<Item>() {
                @Override
                public void accept(Item item) throws Exception {
                    Thread.sleep(3000);    
                }
            })
    }
})
...

Observable.just(item)을 통해 만들어진 Observable은 새로 생성된 별도의 Observable이고, 따라서 하나의 Observable에는 하나의 쓰레드만 작동한다는 성질이 별도로 적용된다. 이제 Item마다 새로운 Observable이 생성되므로 각자에 다른 io 스케쥴러 쓰레드가 할당된다.

 

위 코드의 실행 흐름은 다음과 같다:

Source 쓰레드] 1번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(1번 아이템)
--1초 경과--
Source 쓰레드] 2번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(2번 아이템)
--1초 경과--
Source 쓰레드] 3번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(3번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(1번 아이템) 
Source 쓰레드] 4번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(4번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-2] 수행 종료: doOnNext(2번 아이템)
Source 쓰레드] 5번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(5번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-3] 수행 종료: doOnNext(3번 아이템)
Source 쓰레드] 6번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(6번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(4번 아이템)
Source 쓰레드] 7번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(7번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-2] 수행 종료: doOnNext(5번 아이템)
Source 쓰레드] 8번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(8번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-3] 수행 종료: doOnNext(6번 아이템)
Source 쓰레드] 9번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(9번 아이템)
...

 

 

Point 1. ConnectableObservable을 통해 하나의 source를 다수의 Observer가 구독할 수 있도록 한다.

Point 2. filter로 원하는 조건에 따라 source Observable을 나누고 후에 merge한다.

Point 3. 나뉜 Observable에 ObservableTransformer를 사용하여 원하는대로 오퍼레이터 파이프라인을 선언한다.

 

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.observables.ConnectableObservable;


public class Main {
	public enum Type {
    	A, B, C
    }
    private static ObservableTransformer(Type, Type) typeAPipeline = new ObjservableTransformer<Type, Type>() {
    	@Override
        public ObservableSource<Type> apply(Observable<Type> observable) {
        	return observable
				.filter(new Predicate<Type>() {
        			@Override
            		public boolean test(Type arg0) throws Exception {
            		return arg0 == Type.A;
            		}
        		})            
            	.doOnNext(new Consumer<Type>(){
            		@Override
            		public void accept(Type arg0) throws Exception {
                		System.out.println("This is a pipeline only for Type A");
                	}
            	});
        }
    };
    private static ObservableTransformer(Type, Type) typeBPipeline = new ObjservableTransformer<Type, Type>() {
    	@Override
        public ObservableSource<Type> apply(Observable<Type> observable) {
        	return observable
				.filter(new Predicate<Type>() {
        			@Override
            		public boolean test(Type arg0) throws Exception {
            		return arg0 == Type.B;
            		}
        		})            
            	.doOnNext(new Consumer<Type>(){
            		@Override
            		public void accept(Type arg0) throws Exception {
                		System.out.println("This is a pipeline only for Type B");
                	}
            	});
        }
    };
    private static ObservableTransformer(Type, Type) typeCPipeline = new ObjservableTransformer<Type, Type>() {
    	@Override
        public ObservableSource<Type> apply(Observable<Type> observable) {
        	return observable
				.filter(new Predicate<Type>() {
        			@Override
            		public boolean test(Type arg0) throws Exception {
            		return arg0 == Type.C;
            		}
        		})            
            	.doOnNext(new Consumer<Type>(){
            		@Override
            		public void accept(Type arg0) throws Exception {
                		System.out.println("This is a pipeline only for Type C");
                	}
            	});
        }
    };
    public static void main(String[] args) throws Exception {
    	final Type types = {Type.A, Type.A, Type.B, Type.C, Type.A, Type.B, Type.B, Type.C};
        ConnectableObservable<Type> source = Observable.fromArray(types).publish();
        Observable<Type> typeAObservable = source.compose(typeAPipeline);
        Observable<Type> typeBObservable = source.compose(typeBPipeline);
        Observable<Type> typeCObservable = source.compose(typeCPipeline);
        Observable.merge(typeAObservable, typeBObservable, typeCObservable)
        			.subscribe();
        source.connect();
    }

}

※ 동일한 오퍼레이션을 두 개 이상 선언하면 doOn* 은 선언 순서대로, doAfter* 은 선언 역순으로 실행된다.

 

doOnComplete

마지막 요소가 방출될 때, onCompelete 바로 전에 실행됨. onComplete와 마찬가지로, 에러가 발생하면 실행되지 않음.

 

doOnDispose

구독을 해지했을 때 실행됨.

 

doOnNext

요소를 방출할 때, onNext 바로 전에 실행됨.

 

doOnError

오퍼레이션 중 에러가 발생했을 경우, onError 바로 전에 실행됨.

 

doOnEach

doOnComplete, doOnNext, doOnError의 통합 버전. Notification<?>을 아규먼트로 받아 isOnComplete, isOnNext, isOnError로 각각에 맞는 처리를 할 수 있음.

 

doOnSubscribe

구독 시, onSubscribe 바로 전에 실행됨.

 

doOnLifecyle

doOnSubscribe, doOnDispose의 통합 버전.

 

doOnTerminate

Observable이 종료될 때, onComplete 또는 onError 바로 전에 실행됨. doOnComplete 또는 doOnError가 선언되어 있다면 선언한 순서대로 실행됨. (업스트림 -> 다운스트림)

 

doAfterNext

요소를 방출할 때, onNext 바로 후에 실행됨.

 

doAfterTerminate

Observable이 종료될 때, onComplete 또는 onError 후에 실행됨. (cancel, dispose시에는 실행되지 않음) doFinnally가 선언되어 있다면 그 후에 실행됨.

 

doFinally

Observable이 종료될 때 실행됨. doAfterTerminate가 Observable의 완료 또는 에러에 대해서만 실행되지만, doFinally는 cancel, dispose시에도 실행된다. doAfterTerminate가 선언되어 있다면 doFinally가 먼저 실행된다.

 

 

실행 순서

doOnSubscribe -> doOnNext -> onNext -> doAfterNext -> 선언 순서에 따라 doOnComplete / doOnError 또는 doOnTermitate -> doFinally -> doAfterTerminate

subscribeOn, observeOn 이 두 가지의 차이에 대해 비교해 놓은 글이 상당히 많다. 

 

그냥 아래 다섯 가지만 기억하자.

 

1. observeOn 은 다운스트림에만 영향을 준다.

2. subscribeOn 은 업스트립, 다운스트림 양쪽에 영향을 준다.

3. 한 Observable에 대한 두 번 이상의 subscribeOn 호출은 첫 번째 호출만 유효하다.

4. 한 Observable에 대한 observeOn 호출은 몇 번이고 유효하다.

5. observeOn으로 변경된 쓰레드는 subscribeOn으로 변경할 수 없다.

 

출처:

https://proandroiddev.com/rx-java-subscribeon-and-observeon-a7d95041ce96

 

RxJava subscribeOn and ObserveOn

SubscribeOn and ObserveOn, two concepts that eludes most Rx beginners . I too belonged to this before my experiments using them. Below is a…

proandroiddev.com

 

 

+ Recent posts