本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。
RxJava 的线程模型如下:
1. 不指定线程的情况
- 不指定线程也就是不使用
observeOn
和subscribeOn
,所有操作在调用subscribe
的线程执行。
@Test
public void noThread() {
buildObservable().subscribe();
}
上面代码的输出为:
Thread[main] execute Action start emmit
Thread[main] execute Operation-1, event: 1
Thread[main] execute Operation-2, event: 1
2. subscribeOn
subscribeOn
不管调用多少次,只以第一次为准。如果只使用了subscribeOn
、没有使用observeOn
,则所有操作在第一次调用生成的线程里执行。
@Test
public void subscribeOn() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> observable = buildObservable();
observable
.subscribeOn(scheduler("subscribeOn-1"))
.subscribeOn(scheduler("subscribeOn-2"))
.subscribe(i -> {
showMessageWithThreadName("Action subscribe");
latch.countDown();
});
latch.await();
}
上面代码的输出为:
create scheduler subscribeOn-2
create scheduler subscribeOn-1
Thread[subscribeOn-1] execute Action start emmit
Thread[subscribeOn-1] execute Operation-1, event: 1
Thread[subscribeOn-1] execute Operation-2, event: 1
Thread[subscribeOn-1] execute Action subscribe
3. observeOn
observeOn
必须跟subscribeOn
一起使用,单独使用会抛出空引用异常。observeOn
应在subscribeOn
的后面调用,否则会出现死锁的情况。observeOn
操作会更改后续操作的执行线程,直至下一个observeOn
调用之前的操作或subscribe
操作。
@Test
public void observeOn() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> observable = buildObservable();
observable
.subscribeOn(scheduler("subscribeOn-1"))
.observeOn(scheduler("observeOn-1"))
.doOnNext(i -> {
showMessageWithThreadName("Operation-3, event: " + i);
})
.observeOn(scheduler("observeOn-2"))
.subscribe(i -> {
showMessageWithThreadName("subscribe " + i);
latch.countDown();
});
latch.await();
}
上面代码的输出为:
create scheduler subscribeOn-1
Thread[subscribeOn-1] execute Action start emmit
Thread[subscribeOn-1] execute Operation-1, event: 1
Thread[subscribeOn-1] execute Operation-2, event: 1
create scheduler observeOn-1
Thread[observeOn-1] execute Operation-3, event: 1
create scheduler observeOn-2
Thread[observeOn-2] execute subscribe 1
4. 辅助代码
// 返回用给定线程名 命名的Scheduler
private Scheduler scheduler(String name) {
return Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
System.out.println("create scheduler " + name);
Thread t = new Thread(r, name);
return t;
}
}));
}
// 输出当前线程名和给的消息
private void showMessageWithThreadName(String msg) {
Thread t = Thread.currentThread();
System.out.printf("%-10s execute %s\n", "Thread[" + t.getName() + "]", msg);
}
// 构建一个带有两个中间操作的 Observable
private Observable<Integer> buildObservable() {
return Observable.fromPublisher((Subscriber<? super Integer> s) -> {
showMessageWithThreadName("Action start emmit");
// 消息源
s.onNext(1);
s.onComplete();
})
.doOnNext(i -> {
showMessageWithThreadName("Operation-1, event: " + i);
})
.doOnNext(i -> {
showMessageWithThreadName("Operation-2, event: " + i);
});
}
二. 线程切换实现分析
subscribeOn
和 observeOn
行为差异的原因是它们的调度时机、执行机制不同。
1. subscribeOn
subscribeOn
是在调用 subscribe
时触发。
subscribeOn
只是把 subscribe
的调用放到给定的调度器上去执行,如果先后声明了两个调度器 S1、S2,则 S2 把动作调度给 S1 后,真正的逻辑是在 S1 上执行,所以没有必须声明两个 subscribeOn 调度器。
2. observeOn
observeOn
是在调用 onNext
时触发。
observeOn
的实现是在 ObservableObserveOn
类里,使用内部类 ObserveOnObserver
作为上游的订阅者、下游的消息来源、即主题。ObserveOnObserver
引入一个队列作为线程协调机制,前面的事件处理线程通知 ObserveOnObserver
时,把事件放入队列,触发调度器执行,然后返回;调度器执行时,不断从队列取出事件进行处理、直至所有事件处理完成。
更多的实现细节可以看源码。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。