Java/RxJava
RxJava2 요소에 따라 파이프라인 다르게 적용하기
ParkCheolu
2020. 5. 12. 14:10
Point 1. ConnectableObservable을 통해 하나의 source를 다수의 Observer가 구독할 수 있도록 한다.
Point 2. filter로 원하는 조건에 따라 source Observable을 나누고 후에 merge한다.
Point 3. 나뉜 Observable에 ObservableTransformer를 사용하여 원하는대로 오퍼레이터 파이프라인을 선언한다.
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.observables.ConnectableObservable;
public class Main {
public enum Type {
A, B, C
}
private static ObservableTransformer(Type, Type) typeAPipeline = new ObjservableTransformer<Type, Type>() {
@Override
public ObservableSource<Type> apply(Observable<Type> observable) {
return observable
.filter(new Predicate<Type>() {
@Override
public boolean test(Type arg0) throws Exception {
return arg0 == Type.A;
}
})
.doOnNext(new Consumer<Type>(){
@Override
public void accept(Type arg0) throws Exception {
System.out.println("This is a pipeline only for Type A");
}
});
}
};
private static ObservableTransformer(Type, Type) typeBPipeline = new ObjservableTransformer<Type, Type>() {
@Override
public ObservableSource<Type> apply(Observable<Type> observable) {
return observable
.filter(new Predicate<Type>() {
@Override
public boolean test(Type arg0) throws Exception {
return arg0 == Type.B;
}
})
.doOnNext(new Consumer<Type>(){
@Override
public void accept(Type arg0) throws Exception {
System.out.println("This is a pipeline only for Type B");
}
});
}
};
private static ObservableTransformer(Type, Type) typeCPipeline = new ObjservableTransformer<Type, Type>() {
@Override
public ObservableSource<Type> apply(Observable<Type> observable) {
return observable
.filter(new Predicate<Type>() {
@Override
public boolean test(Type arg0) throws Exception {
return arg0 == Type.C;
}
})
.doOnNext(new Consumer<Type>(){
@Override
public void accept(Type arg0) throws Exception {
System.out.println("This is a pipeline only for Type C");
}
});
}
};
public static void main(String[] args) throws Exception {
final Type types = {Type.A, Type.A, Type.B, Type.C, Type.A, Type.B, Type.B, Type.C};
ConnectableObservable<Type> source = Observable.fromArray(types).publish();
Observable<Type> typeAObservable = source.compose(typeAPipeline);
Observable<Type> typeBObservable = source.compose(typeBPipeline);
Observable<Type> typeCObservable = source.compose(typeCPipeline);
Observable.merge(typeAObservable, typeBObservable, typeCObservable)
.subscribe();
source.connect();
}
}