Point 1. ConnectableObservable을 통해 하나의 source를 다수의 Observer가 구독할 수 있도록 한다.
Point 2. filter로 원하는 조건에 따라 source Observable을 나누고 후에 merge한다.
Point 3. 나뉜 Observable에 ObservableTransformer를 사용하여 원하는대로 오퍼레이터 파이프라인을 선언한다.
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.observables.ConnectableObservable;
public class Main {
public enum Type {
A, B, C
}
private static ObservableTransformer(Type, Type) typeAPipeline = new ObjservableTransformer<Type, Type>() {
@Override
public ObservableSource<Type> apply(Observable<Type> observable) {
return observable
.filter(new Predicate<Type>() {
@Override
public boolean test(Type arg0) throws Exception {
return arg0 == Type.A;
}
})
.doOnNext(new Consumer<Type>(){
@Override
public void accept(Type arg0) throws Exception {
System.out.println("This is a pipeline only for Type A");
}
});
}
};
private static ObservableTransformer(Type, Type) typeBPipeline = new ObjservableTransformer<Type, Type>() {
@Override
public ObservableSource<Type> apply(Observable<Type> observable) {
return observable
.filter(new Predicate<Type>() {
@Override
public boolean test(Type arg0) throws Exception {
return arg0 == Type.B;
}
})
.doOnNext(new Consumer<Type>(){
@Override
public void accept(Type arg0) throws Exception {
System.out.println("This is a pipeline only for Type B");
}
});
}
};
private static ObservableTransformer(Type, Type) typeCPipeline = new ObjservableTransformer<Type, Type>() {
@Override
public ObservableSource<Type> apply(Observable<Type> observable) {
return observable
.filter(new Predicate<Type>() {
@Override
public boolean test(Type arg0) throws Exception {
return arg0 == Type.C;
}
})
.doOnNext(new Consumer<Type>(){
@Override
public void accept(Type arg0) throws Exception {
System.out.println("This is a pipeline only for Type C");
}
});
}
};
public static void main(String[] args) throws Exception {
final Type types = {Type.A, Type.A, Type.B, Type.C, Type.A, Type.B, Type.B, Type.C};
ConnectableObservable<Type> source = Observable.fromArray(types).publish();
Observable<Type> typeAObservable = source.compose(typeAPipeline);
Observable<Type> typeBObservable = source.compose(typeBPipeline);
Observable<Type> typeCObservable = source.compose(typeCPipeline);
Observable.merge(typeAObservable, typeBObservable, typeCObservable)
.subscribe();
source.connect();
}
}
'Java > RxJava' 카테고리의 다른 글
RxJava2 스케쥴러 쓰레드 기아 현상 (Scheduler Thread Starvation Problem) (0) | 2020.06.04 |
---|---|
RxJava2 ConnectableObservable 에러 처리 유의사항 (0) | 2020.05.15 |
RxJava2 병렬 처리 (0) | 2020.05.13 |
RxJava2 오퍼레이터 - do* 시리즈와 실행 순서 (0) | 2020.04.24 |
subscribeOn, observeOn (0) | 2020.04.21 |