RxJava

ReactiveX

ReactiveX 是一个用于异步编程的 API 规范。 ReactiveX 结合了 Observer 模式、Iterator 模式和函数式编程的最佳理念。

ReactiveX 带来了更好的代码基础:

  • Functional, 函数式:避免了复杂的有状态的程序,在可观察流上使用干净(无副作用)的 输入/输出 函数。
  • Less is more, 少即是多:ReactiveX 的操作子通常把精心制作的修改简化为几行代码。
  • Async error handling, 异步错误处理:传统的 try/catch 对于异步计算的错误非常乏力,但 ReactiveX 具有恰当的机制来处理错误。
  • Concurrency made easy, 更容易的并发:ReactiveX 的 Observables 和 Schedulers 允许程序员从底层的线程、同步和并发问题中抽象出来。

RxJava

RxJava 是 ReactiveX 在 Java 编程语言里的一个实现。

基本概念:

  • 事件:主题生成的、订阅者感兴趣的东西。
  • 订阅者:Observer,抽象基类是 Subscriber
  • 主题:被观察的对象,抽象基类是 Observable。每个主题都有一个 OnSubscribe 的实例,OnSubscribe 从类名看是对订阅行为的反应,其 call(Subscriber subscriber) 方法封装了事件发生、通知的逻辑,供每次订阅时调用。
  • 订阅:subscribe,是一种动作,RxJava 在订阅时建立主题与监听者的关系,每次订阅,主题都会调用其内部 OnSubscribe.call(Subscriber subscriber) 方法。

  • 对于 Observable.doOnNext/doOnCompleted/doOnError/doOnEach/map 这类中间操作,生成一个新的订阅者 Subscriber,封装了相关行为,用于添加新的逻辑,并代理了对之前订阅者的调用;用新的订阅者和当前主题创建新的主题并返回。(采用的是包装器模式)

核心代码

rx.Observable

// 创建主题
protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

// 订阅
public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

// 精简后的订阅行为
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    subscriber.onStart();

    // 主题在 call 方法里 通知订阅者 事件
    observable.onSubscribe.call(subscriber);
    return RxJavaHooks.onObservableReturn(subscriber);
}

// 中间操作。把 onNext 行为包装一个动作里
public final Observable<T> doOnNext(final Action1<? super T> onNext) {
    Action1<Throwable> onError = Actions.empty();
    Action0 onCompleted = Actions.empty();
    Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);

    return create(new OnSubscribeDoOnEach<T>(this, observer));
}

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        subscriber.onStart();

        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
}

rx.internal.operators.OnSubscribeDoOnEach 源码,从这个实现可以看到,直到 subscribe 监听时才会把监听者包装起来,然后事件流过所有的中间处理器,最终传到监听者。

public class OnSubscribeDoOnEach<T> implements OnSubscribe<T> {
    private final Observer<? super T> doOnEachObserver;
    private final Observable<T> source;

    public OnSubscribeDoOnEach(Observable<T> source, Observer<? super T> doOnEachObserver) {
        this.source = source;
        this.doOnEachObserver = doOnEachObserver;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        source.unsafeSubscribe(new DoOnEachSubscriber<T>(subscriber, doOnEachObserver));
    }

    private static final class DoOnEachSubscriber<T> extends Subscriber<T> {
        private final Subscriber<? super T> subscriber;
        private final Observer<? super T> doOnEachObserver;
        private boolean done;

        DoOnEachSubscriber(Subscriber<? super T> subscriber, Observer<? super T> doOnEachObserver) {
            super(subscriber);
            this.subscriber = subscriber;
            this.doOnEachObserver = doOnEachObserver;
        }

        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            try {
                doOnEachObserver.onCompleted();
            } catch (Throwable e) {
                Exceptions.throwOrReport(e, this);
                return;
            }
            // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
            done = true;
            subscriber.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;
            try {
                doOnEachObserver.onError(e);
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                subscriber.onError(new CompositeException(Arrays.asList(e, e2)));
                return;
            }
            subscriber.onError(e);
        }

        @Override
        public void onNext(T value) {
            if (done) {
                return;
            }
            try {
                doOnEachObserver.onNext(value);
            } catch (Throwable e) {
                Exceptions.throwOrReport(e, this, value);
                return;
            }
            subscriber.onNext(value);
        }
    }
}

doOnNext/doOnCompleted 等中间操作通过包装器模式把 Observable 封装为 OnSubsrcibe 的子类实现,持有要添加的逻辑。

OnSubscribecall 方法调用时,把持有的中间操作包装在 传入的订阅者 外层 作为新的订阅者,新订阅者作为参数传入 解包装出来的 Observable.unsafeSubscribe ,该方法又会调用到其持有的 OnSubscribecall 方法上,形成一个递归调用,直至最开始的 OnSubscribe 对象,该对象的 call 方法里执行 事件生成、通知的 调用,事件通过 Observer 的层层包装调用最终到业务的订阅者。

发表评论

电子邮件地址不会被公开。 必填项已用*标注