Point 1. subscribeOn 은 업스트림과 다운스트림에, observeOn 은 다운스트림에 영향을 준다는 점을 기억한다.
Point 2. 하나의 파이프라인에는 하나의 쓰레드만 작동한다. (flatMap 을 통한 병렬화를 하지 않을 경우.)
Point 3. 워커 쓰레드가 요소를 처리 중일 때 다음 요소가 들어오면 이 요소는 queuing되고, 현재 요소의 처리가 끝나면 queuing된 다음 요소를 처리한다.
Point 4. 처리 지연에 대비하여 별도 스케쥴러 워커에 실행시킬 로직이 있고, observeOn 을 사용하려 한다면 이 로직은 반드시 observeOn 다음에 위치시켜야 한다.
스케쥴러 쓰레드 기아 현상은 다음과 같은 경우 발생한다:
// 아래 두 source는 1초에 한 번 Item을 발행하는 Hot Observable 이라 가정한다.
Observable<Item> source1 = ... // observeOn 스케쥴링이 걸린 소스 1 (워커A)
Observable<Item> source2 = ... // observeOn 스케쥴링이 걸린 소스 2 (워커B)
Observable.merge(source1, source2)
.subscribe(item -> {
... // 1초 넘는 지연 발생
});
위 코드에서 작업을 수행하는 쓰레드는 워커A, 워커B 두 쓰레드이다. observeOn은 다운스트림으로 subscribe까지 영향을 미치므로, 위 코드에서 merge 의 subscribe는 Item이 방출된 워커 쓰레드가 수행하려 한다. (source1에서 나온 Item은 워커A가, source2에서 나온 Item은 워커B가.)
그런데 subscribe에서 지연이 발생할 경우 이야기는 달라지고, 위 코드는 다음과 같은 결과를 낳는다. (워커의 순서는 무관하며, Item 구분을 위해 워커A의 아이템은 ItemA-{순서}, 워커B의 아이템은 ItemB-{순서} 라 칭한다.)
1. 워커A가 ItemA-1을 방출하고 subscribe 안의 무거운 로직을 처리하러 들어간다.
2. 워커B가 ItemB-1을 방출하고 subscribe 으로 Item을 보낸다. 이 때 워커A가 이미 ItemA-1을 처리하는 중이기 때문에 ItemB-1은 queuing된다.
3. 워커A는 ItemA-1의 처리를 마치고 queuing되었던 ItemB-1 처리를 시작한다.
4. 워커B가 ItemB-2을 방출한다. 워커A는 워커B의 첫 번째 ItemB-1을 처리하는 중이기 때문에 이 ItemB-2는 queuing된다.
5. 워커A는 ItemB-1의 처리를 마치고 ItemB-2 처리를 시작한다.
6. 워커B는 ItemB-3를 방출한다. 워커A는 ItemB-2를 처리하는 중이기 때문에 ItemB-3는 queuing된다.
7. 워커A는 ItemB-2의 처리를 마치고 ItemB-3 처리를 시작한다.
6. 워커B는 ItemB-4를 방출한다. 워커A는 ItemB-3를 처리하는 중이기 때문에 ItemB-4는 queuing된다.
...
위와 같이 워커A는 merged Observable로 queuing되는 Item을 처리하기 위해 계속 subscribe안에 머물게 된다. 문제는 source Observable이 Hot Observable이라는 것이다. 워커B가 ItemB-*를 계속해서 내보내고 queuing시키는 동안 ItemA-*또한 발생할 수 있는데, 워커A가 merged Observable에서 ItemB-*를 처리하느라 ItemA-*를 핸들링하지 못한다. merged Observable에 먼저 진입한 워커 쓰레드는 갇히게 되는 것이다.
Hot Observable이 아니더라도 이 현상은 똑같이 발생할 수 있는데, Cold Observable의 경우 모든 ItemB-* 처리를 마치고 나서야 ItemA-* 핸들링을 시작하게 된다. Hot Observable은 무한히 갇히게 된다는 점, Cold Observable은 마치 concatated Observable (Observable.concat(...)) 처럼, 하나의 source를 완전히 마치기 전까지 다른 source를 처리하지 못한다. 이는 merge 를 사용한 의도에 부합되지 않는다.
이 현상을 방지하기 위해서는 merged Observable의 로직 처리 또한 다른 워커 쓰레드에게 맡겨야 한다. 그 방법은 다양하겠지만, 대충 아래와 같이 실현할 수 있다:
...
Observable.merge(source1, source2)
.observeOn(Schedulers.io())
.subscribe(item -> {
... // 1초 넘는 지연 발생
});
'Java > RxJava' 카테고리의 다른 글
RxJava2 ConnectableObservable 에러 처리 유의사항 (0) | 2020.05.15 |
---|---|
RxJava2 병렬 처리 (0) | 2020.05.13 |
RxJava2 요소에 따라 파이프라인 다르게 적용하기 (0) | 2020.05.12 |
RxJava2 오퍼레이터 - do* 시리즈와 실행 순서 (0) | 2020.04.24 |
subscribeOn, observeOn (0) | 2020.04.21 |