ReactiveX
ReactiveX 是一个用于异步编程的 API 规范。 ReactiveX 结合了 Observer 模式、Iterator 模式和函数式编程的最佳理念。
ReactiveX 带来了更好的代码基础:
- Functional, 函数式:避免了复杂的有状态的程序,在可观察流上使用干净(无副作用)的 输入/输出 函数。
- Less is more, 少即是多:ReactiveX 的操作子通常把精心制作的修改简化为几行代码。
- Async error handling, 异步错误处理:传统的 try/catch 对于异步计算的错误非常乏力,但 ReactiveX 具有恰当的机制来处理错误。
- Concurrency made easy, 更容易的并发:ReactiveX 的 Observables 和 Schedulers 允许程序员从底层的线程、同步和并发问题中抽象出来。
RxJava
RxJava 是 ReactiveX 在 Java 编程语言里的一个实现。
基本概念:
- 事件:主题生成的、订阅者感兴趣的东西。
- 订阅者:Observer,抽象基类是
Subscriber
。 - 主题:被观察的对象,抽象基类是
Observable
。每个主题都有一个OnSubscribe
的实例,OnSubscribe
从类名看是对订阅行为的反应,其call(Subscriber subscriber)
方法封装了事件发生、通知的逻辑,供每次订阅时调用。 -
订阅:subscribe,是一种动作,RxJava 在订阅时建立主题与监听者的关系,每次订阅,主题都会调用其内部
OnSubscribe.call(Subscriber subscriber)
方法。 -
对于
Observable.doOnNext/doOnCompleted/doOnError/doOnEach/map
这类中间操作,生成一个新的订阅者Subscriber
,封装了相关行为,用于添加新的逻辑,并代理了对之前订阅者的调用;用新的订阅者和当前主题创建新的主题并返回。(采用的是包装器模式)
核心代码
rx.Observable
:
// 创建主题
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
// 订阅
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
// 精简后的订阅行为
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
// 主题在 call 方法里 通知订阅者 事件
observable.onSubscribe.call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
// 中间操作。把 onNext 行为包装一个动作里
public final Observable<T> doOnNext(final Action1<? super T> onNext) {
Action1<Throwable> onError = Actions.empty();
Action0 onCompleted = Actions.empty();
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);
return create(new OnSubscribeDoOnEach<T>(this, observer));
}
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
rx.internal.operators.OnSubscribeDoOnEach
源码,从这个实现可以看到,直到 subscribe
监听时才会把监听者包装起来,然后事件流过所有的中间处理器,最终传到监听者。
public class OnSubscribeDoOnEach<T> implements OnSubscribe<T> {
private final Observer<? super T> doOnEachObserver;
private final Observable<T> source;
public OnSubscribeDoOnEach(Observable<T> source, Observer<? super T> doOnEachObserver) {
this.source = source;
this.doOnEachObserver = doOnEachObserver;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
source.unsafeSubscribe(new DoOnEachSubscriber<T>(subscriber, doOnEachObserver));
}
private static final class DoOnEachSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> subscriber;
private final Observer<? super T> doOnEachObserver;
private boolean done;
DoOnEachSubscriber(Subscriber<? super T> subscriber, Observer<? super T> doOnEachObserver) {
super(subscriber);
this.subscriber = subscriber;
this.doOnEachObserver = doOnEachObserver;
}
@Override
public void onCompleted() {
if (done) {
return;
}
try {
doOnEachObserver.onCompleted();
} catch (Throwable e) {
Exceptions.throwOrReport(e, this);
return;
}
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
subscriber.onError(new CompositeException(Arrays.asList(e, e2)));
return;
}
subscriber.onError(e);
}
@Override
public void onNext(T value) {
if (done) {
return;
}
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, value);
return;
}
subscriber.onNext(value);
}
}
}
doOnNext/doOnCompleted
等中间操作通过包装器模式把 Observable 封装为 OnSubsrcibe
的子类实现,持有要添加的逻辑。
OnSubscribe
的 call
方法调用时,把持有的中间操作包装在 传入的订阅者 外层 作为新的订阅者,新订阅者作为参数传入 解包装出来的 Observable.unsafeSubscribe
,该方法又会调用到其持有的 OnSubscribe
的 call
方法上,形成一个递归调用,直至最开始的 OnSubscribe
对象,该对象的 call
方法里执行 事件生成、通知的 调用,事件通过 Observer
的层层包装调用最终到业务的订阅者。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。