Java/RxJava

RxJava2 병렬 처리

ParkCheolu 2020. 5. 13. 14:17

RxJava2 에서, 하나의 Observable에는 하나의 쓰레드만 작동한다. 파이프라인에 스케쥴러를 사용해도 요소가 병렬로 처리되지는 않는다.

 

예로, 아래 코드를 보자:

...
Observable.create(new ObserveOnSubscribe<Item>() {
	@Override
    public void subscribe(ObservableEmitter<Item> emitter) throws Exception {
    	while (true) {
        	emitter.onNext(/* Item */);
            Thread.sleep(1000);
        }
    }
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Item>() {
	@Override
    public void accept(Item item) throws Exception {
    	Thread.sleep(3000);    
    }
})
...

source는 1초에 한 번 Item을 발행한다. Schedulers.io()를 사용하였으므로 doOnNext에서 수행하는 로직이 1초를 초과해도 source의 Item 발행이 지연되지는 않는다.

 

이 때 사용자는 doOnNext의 수행이 1초를 초과하여 한 Item을 처리하기 전에 다음 Item이 발행되면 io 스케쥴러의 또다른 쓰레드가 작동하여 별도로 doOnNext로직을 수행할 거라고 생각할 수 있는데, RxJava2에서 한 Observable은 동시에 하나의 쓰레드만이 작동하기 때문에, doOnNext의 처리가 지연되어도 Item은 보이지 않게 계속 쌓이고, 현재 Item이 완전히 처리되어야 다음 Item을 처리한다.

 

위 코드에서 doOnNext는 3초의 처리 시간을 갖는다. 위 코드는 다음과 같은 흐름으로 실행된다.

Source 쓰레드] 1번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(1번 아이템) 
--1초 경과--
Source 쓰레드] 2번 Item 발행
--1초 경과--
Source 쓰레드] 3번 Item 발행
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(1번 아이템)
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(2번 아이템) 
Source 쓰레드] 4번 Item 발행
--1초 경과--
Source 쓰레드] 5번 Item 발행
--1초 경과--
Source 쓰레드] 6번 Item 발행
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(2번 아이템)
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(3번 아이템) 
Source 쓰레드] 7번 Item 발행
--1초 경과--
Source 쓰레드] 8번 Item 발행
--1초 경과--
Source 쓰레드] 9번 Item 발행
...

물론 실제로 돌려보면 중간 경과 시간은 정확히 1초가 아닐 것이고, doOnNext 시작/종료와 Item 발행은 순서가 뒤바뀔 수 있다. 여튼 위처럼, 처리 쓰레드는 한 Item을 처리하는 데에 3초가 걸리고, Item은 1초에 하나씩 발행된다.

 

이처럼, observeOn은 처리 쓰레드를 달리 하는 것이지, 병렬 처리는 완전 별개의 문제다. Item 처리가 지연될 때 다른 쓰레드가 다음 Item을 처리하기를 원하면 다음과 같이 해야한다:

...
Observable.create(new ObserveOnSubscribe<Item>() {
	@Override
    public void subscribe(ObservableEmitter<Item> emitter) throws Exception {
    	while (true) {
        	emitter.onNext(/* Item */);
            Thread.sleep(1000);
        }
    }
})
.flatMap(new Function<Item, Observable<Item>>() {
	@Override
    public Observable<Item> apply(Item item) throws Exception {
    	return Observable.just(item)
			.observeOn(Schedulers.io())
            .doOnNext(new Consumer<Item>() {
                @Override
                public void accept(Item item) throws Exception {
                    Thread.sleep(3000);    
                }
            })
    }
})
...

Observable.just(item)을 통해 만들어진 Observable은 새로 생성된 별도의 Observable이고, 따라서 하나의 Observable에는 하나의 쓰레드만 작동한다는 성질이 별도로 적용된다. 이제 Item마다 새로운 Observable이 생성되므로 각자에 다른 io 스케쥴러 쓰레드가 할당된다.

 

위 코드의 실행 흐름은 다음과 같다:

Source 쓰레드] 1번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(1번 아이템)
--1초 경과--
Source 쓰레드] 2번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(2번 아이템)
--1초 경과--
Source 쓰레드] 3번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(3번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(1번 아이템) 
Source 쓰레드] 4번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(4번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-2] 수행 종료: doOnNext(2번 아이템)
Source 쓰레드] 5번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(5번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-3] 수행 종료: doOnNext(3번 아이템)
Source 쓰레드] 6번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(6번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-1] 수행 종료: doOnNext(4번 아이템)
Source 쓰레드] 7번 Item 발행
io 스케쥴러 쓰레드-1] 수행 시작: doOnNext(7번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-2] 수행 종료: doOnNext(5번 아이템)
Source 쓰레드] 8번 Item 발행
io 스케쥴러 쓰레드-2] 수행 시작: doOnNext(8번 아이템)
--1초 경과--
io 스케쥴러 쓰레드-3] 수행 종료: doOnNext(6번 아이템)
Source 쓰레드] 9번 Item 발행
io 스케쥴러 쓰레드-3] 수행 시작: doOnNext(9번 아이템)
...