RxJava2 에서, 하나의 Observable에는 하나의 쓰레드만 작동한다. 파이프라인에 스케쥴러를 사용해도 요소가 병렬로 처리되지는 않는다.
예로, 아래 코드를 보자:
...
Observable.create(new ObserveOnSubscribe<Item>() {
@Override
public void subscribe(ObservableEmitter<Item> emitter) throws Exception {
while (true) {
emitter.onNext(/* Item */);
Thread.sleep(1000);
}
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Item>() {
@Override
public void accept(Item item) throws Exception {
Thread.sleep(3000);
}
})
...
source는 1초에 한 번 Item을 발행한다. Schedulers.io()를 사용하였으므로 doOnNext에서 수행하는 로직이 1초를 초과해도 source의 Item 발행이 지연되지는 않는다.
이 때 사용자는 doOnNext의 수행이 1초를 초과하여 한 Item을 처리하기 전에 다음 Item이 발행되면 io 스케쥴러의 또다른 쓰레드가 작동하여 별도로 doOnNext로직을 수행할 거라고 생각할 수 있는데, RxJava2에서 한 Observable은 동시에 하나의 쓰레드만이 작동하기 때문에, doOnNext의 처리가 지연되어도 Item은 보이지 않게 계속 쌓이고, 현재 Item이 완전히 처리되어야 다음 Item을 처리한다.
위 코드에서 doOnNext는 3초의 처리 시간을 갖는다. 위 코드는 다음과 같은 흐름으로 실행된다.
Source 쓰레드] 1번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(1번 아이템)
--1초 경과--
Source 쓰레드] 2번 Item 발행
--1초 경과--
Source 쓰레드] 3번 Item 발행
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(1번 아이템)
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(2번 아이템)
Source 쓰레드] 4번 Item 발행
--1초 경과--
Source 쓰레드] 5번 Item 발행
--1초 경과--
Source 쓰레드] 6번 Item 발행
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(2번 아이템)
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(3번 아이템)
Source 쓰레드] 7번 Item 발행
--1초 경과--
Source 쓰레드] 8번 Item 발행
--1초 경과--
Source 쓰레드] 9번 Item 발행
...
물론 실제로 돌려보면 중간 경과 시간은 정확히 1초가 아닐 것이고, doOnNext 시작/종료와 Item 발행은 순서가 뒤바뀔 수 있다. 여튼 위처럼, 처리 쓰레드는 한 Item을 처리하는 데에 3초가 걸리고, Item은 1초에 하나씩 발행된다.
이처럼, observeOn은 처리 쓰레드를 달리 하는 것이지, 병렬 처리는 완전 별개의 문제다. Item 처리가 지연될 때 다른 쓰레드가 다음 Item을 처리하기를 원하면 다음과 같이 해야한다:
...
Observable.create(new ObserveOnSubscribe<Item>() {
@Override
public void subscribe(ObservableEmitter<Item> emitter) throws Exception {
while (true) {
emitter.onNext(/* Item */);
Thread.sleep(1000);
}
}
})
.flatMap(new Function<Item, Observable<Item>>() {
@Override
public Observable<Item> apply(Item item) throws Exception {
return Observable.just(item)
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Item>() {
@Override
public void accept(Item item) throws Exception {
Thread.sleep(3000);
}
})
}
})
...
Observable.just(item)을 통해 만들어진 Observable은 새로 생성된 별도의 Observable이고, 따라서 하나의 Observable에는 하나의 쓰레드만 작동한다는 성질이 별도로 적용된다. 이제 Item마다 새로운 Observable이 생성되므로 각자에 다른 io 스케쥴러 쓰레드가 할당된다.
위 코드의 실행 흐름은 다음과 같다:
Source 쓰레드] 1번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(1번 아이템)
--1초 경과--
Source 쓰레드] 2번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(2번 아이템)
--1초 경과--
Source 쓰레드] 3번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(3번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(1번 아이템)
Source 쓰레드] 4번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(4번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-2] 수행 종료: doOnNext(2번 아이템)
Source 쓰레드] 5번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(5번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-3] 수행 종료: doOnNext(3번 아이템)
Source 쓰레드] 6번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(6번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(4번 아이템)
Source 쓰레드] 7번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(7번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-2] 수행 종료: doOnNext(5번 아이템)
Source 쓰레드] 8번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(8번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-3] 수행 종료: doOnNext(6번 아이템)
Source 쓰레드] 9번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(9번 아이템)
...
'Java > RxJava' 카테고리의 다른 글
RxJava2 스케쥴러 쓰레드 기아 현상 (Scheduler Thread Starvation Problem) (0) | 2020.06.04 |
---|---|
RxJava2 ConnectableObservable 에러 처리 유의사항 (0) | 2020.05.15 |
RxJava2 요소에 따라 파이프라인 다르게 적용하기 (0) | 2020.05.12 |
RxJava2 오퍼레이터 - do* 시리즈와 실행 순서 (0) | 2020.04.24 |
subscribeOn, observeOn (0) | 2020.04.21 |