本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。
RxJava 的线程模型如下:
1. 不指定线程的情况
- 不指定线程也就是不使用 observeOn和subscribeOn,所有操作在调用subscribe的线程执行。
@Test
public void noThread() {
    buildObservable().subscribe();
}
上面代码的输出为:
Thread[main]   execute   Action start emmit
Thread[main]   execute   Operation-1, event: 1
Thread[main]   execute   Operation-2, event: 1
2. subscribeOn
- subscribeOn不管调用多少次,只以第一次为准。如果只使用了- subscribeOn、没有使用- observeOn,则所有操作在第一次调用生成的线程里执行。
@Test
public void subscribeOn() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Observable<Integer> observable = buildObservable();
    observable
        .subscribeOn(scheduler("subscribeOn-1"))
        .subscribeOn(scheduler("subscribeOn-2"))
        .subscribe(i -> {
            showMessageWithThreadName("Action subscribe");
            latch.countDown();
        });
    latch.await();
}
上面代码的输出为:
create scheduler subscribeOn-2
create scheduler subscribeOn-1
Thread[subscribeOn-1]   execute   Action start emmit
Thread[subscribeOn-1]   execute   Operation-1, event: 1
Thread[subscribeOn-1]   execute   Operation-2, event: 1
Thread[subscribeOn-1]   execute   Action subscribe
3. observeOn
- observeOn必须跟- subscribeOn一起使用,单独使用会抛出空引用异常。
- observeOn应在- subscribeOn的后面调用,否则会出现死锁的情况。
- observeOn操作会更改后续操作的执行线程,直至下一个- observeOn调用之前的操作或- subscribe操作。