本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。
RxJava 的线程模型如下:
1. 不指定线程的情况
- 不指定线程也就是不使用
observeOn
和subscribeOn
,所有操作在调用subscribe
的线程执行。
@Test
public void noThread() {
buildObservable().subscribe();
}
上面代码的输出为:
Thread[main] execute Action start emmit
Thread[main] execute Operation-1, event: 1
Thread[main] execute Operation-2, event: 1
2. subscribeOn
subscribeOn
不管调用多少次,只以第一次为准。如果只使用了subscribeOn
、没有使用observeOn
,则所有操作在第一次调用生成的线程里执行。
@Test
public void subscribeOn() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> observable = buildObservable();
observable
.subscribeOn(scheduler("subscribeOn-1"))
.subscribeOn(scheduler("subscribeOn-2"))
.subscribe(i -> {
showMessageWithThreadName("Action subscribe");
latch.countDown();
});
latch.await();
}
上面代码的输出为:
create scheduler subscribeOn-2
create scheduler subscribeOn-1
Thread[subscribeOn-1] execute Action start emmit
Thread[subscribeOn-1] execute Operation-1, event: 1
Thread[subscribeOn-1] execute Operation-2, event: 1
Thread[subscribeOn-1] execute Action subscribe
3. observeOn
observeOn
必须跟subscribeOn
一起使用,单独使用会抛出空引用异常。observeOn
应在subscribeOn
的后面调用,否则会出现死锁的情况。observeOn
操作会更改后续操作的执行线程,直至下一个observeOn
调用之前的操作或subscribe
操作。