Point 1. ConnectableObservable을 통해 하나의 source를 다수의 Observer가 구독할 수 있도록 한다.

Point 2. filter로 원하는 조건에 따라 source Observable을 나누고 후에 merge한다.

Point 3. 나뉜 Observable에 ObservableTransformer를 사용하여 원하는대로 오퍼레이터 파이프라인을 선언한다.

 

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.observables.ConnectableObservable;


public class Main {
	public enum Type {
    	A, B, C
    }
    private static ObservableTransformer(Type, Type) typeAPipeline = new ObjservableTransformer<Type, Type>() {
    	@Override
        public ObservableSource<Type> apply(Observable<Type> observable) {
        	return observable
				.filter(new Predicate<Type>() {
        			@Override
            		public boolean test(Type arg0) throws Exception {
            		return arg0 == Type.A;
            		}
        		})            
            	.doOnNext(new Consumer<Type>(){
            		@Override
            		public void accept(Type arg0) throws Exception {
                		System.out.println("This is a pipeline only for Type A");
                	}
            	});
        }
    };
    private static ObservableTransformer(Type, Type) typeBPipeline = new ObjservableTransformer<Type, Type>() {
    	@Override
        public ObservableSource<Type> apply(Observable<Type> observable) {
        	return observable
				.filter(new Predicate<Type>() {
        			@Override
            		public boolean test(Type arg0) throws Exception {
            		return arg0 == Type.B;
            		}
        		})            
            	.doOnNext(new Consumer<Type>(){
            		@Override
            		public void accept(Type arg0) throws Exception {
                		System.out.println("This is a pipeline only for Type B");
                	}
            	});
        }
    };
    private static ObservableTransformer(Type, Type) typeCPipeline = new ObjservableTransformer<Type, Type>() {
    	@Override
        public ObservableSource<Type> apply(Observable<Type> observable) {
        	return observable
				.filter(new Predicate<Type>() {
        			@Override
            		public boolean test(Type arg0) throws Exception {
            		return arg0 == Type.C;
            		}
        		})            
            	.doOnNext(new Consumer<Type>(){
            		@Override
            		public void accept(Type arg0) throws Exception {
                		System.out.println("This is a pipeline only for Type C");
                	}
            	});
        }
    };
    public static void main(String[] args) throws Exception {
    	final Type types = {Type.A, Type.A, Type.B, Type.C, Type.A, Type.B, Type.B, Type.C};
        ConnectableObservable<Type> source = Observable.fromArray(types).publish();
        Observable<Type> typeAObservable = source.compose(typeAPipeline);
        Observable<Type> typeBObservable = source.compose(typeBPipeline);
        Observable<Type> typeCObservable = source.compose(typeCPipeline);
        Observable.merge(typeAObservable, typeBObservable, typeCObservable)
        			.subscribe();
        source.connect();
    }

}

+ Recent posts