RxJava 线程模型

本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。

RxJava 的线程模型如下:

1. 不指定线程的情况

  • 不指定线程也就是不使用 observeOnsubscribeOn,所有操作在调用 subscribe 的线程执行。
@Test
public void noThread() {
    buildObservable().subscribe();
}

上面代码的输出为:

Thread[main]   execute   Action start emmit
Thread[main]   execute   Operation-1, event: 1
Thread[main]   execute   Operation-2, event: 1

2. subscribeOn

  • subscribeOn 不管调用多少次,只以第一次为准。如果只使用了 subscribeOn、没有使用 observeOn,则所有操作在第一次调用生成的线程里执行。
@Test
public void subscribeOn() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> observable = buildObservable();
    observable
        .subscribeOn(scheduler("subscribeOn-1"))
        .subscribeOn(scheduler("subscribeOn-2"))
        .subscribe(i -> {
            showMessageWithThreadName("Action subscribe");
            latch.countDown();
        });

    latch.await();
}

上面代码的输出为:

create scheduler subscribeOn-2
create scheduler subscribeOn-1
Thread[subscribeOn-1]   execute   Action start emmit
Thread[subscribeOn-1]   execute   Operation-1, event: 1
Thread[subscribeOn-1]   execute   Operation-2, event: 1
Thread[subscribeOn-1]   execute   Action subscribe

3. observeOn

  • observeOn 必须跟 subscribeOn 一起使用,单独使用会抛出空引用异常。
  • observeOn 应在 subscribeOn 的后面调用,否则会出现死锁的情况。
  • observeOn 操作会更改后续操作的执行线程,直至下一个 observeOn 调用之前的操作或 subscribe 操作。

@Test
public void observeOn() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> observable = buildObservable();
    observable
        .subscribeOn(scheduler("subscribeOn-1"))
        .observeOn(scheduler("observeOn-1"))
        .doOnNext(i -> {
            showMessageWithThreadName("Operation-3, event: " + i);
        })
        .observeOn(scheduler("observeOn-2"))
        .subscribe(i -> {
            showMessageWithThreadName("subscribe  " + i);
            latch.countDown();
        });

    latch.await();
}

上面代码的输出为:

create scheduler subscribeOn-1
Thread[subscribeOn-1]   execute   Action start emmit
Thread[subscribeOn-1]   execute   Operation-1, event: 1
Thread[subscribeOn-1]   execute   Operation-2, event: 1
create scheduler observeOn-1
Thread[observeOn-1]   execute   Operation-3, event: 1
create scheduler observeOn-2
Thread[observeOn-2]   execute   subscribe  1

4. 辅助代码

// 返回用给定线程名 命名的Scheduler
private Scheduler scheduler(String name) {
    return Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            System.out.println("create scheduler " + name);
            Thread t = new Thread(r, name);
            return t;
        }
    }));
}

// 输出当前线程名和给的消息
private void showMessageWithThreadName(String msg) {
    Thread t = Thread.currentThread();
    System.out.printf("%-10s   execute   %s\n", "Thread[" + t.getName() + "]", msg);
}

// 构建一个带有两个中间操作的 Observable
private Observable<Integer> buildObservable() {
    return Observable.fromPublisher((Subscriber<? super Integer> s) -> {
        showMessageWithThreadName("Action start emmit");
        // 消息源
        s.onNext(1);
        s.onComplete();
    })
    .doOnNext(i -> {
        showMessageWithThreadName("Operation-1, event: " + i);
    })
    .doOnNext(i -> {
        showMessageWithThreadName("Operation-2, event: " + i);
    });
}

二. 线程切换实现分析

subscribeOnobserveOn 行为差异的原因是它们的调度时机、执行机制不同。

1. subscribeOn

subscribeOn 是在调用 subscribe 时触发。

subscribeOn 只是把 subscribe 的调用放到给定的调度器上去执行,如果先后声明了两个调度器 S1、S2,则 S2 把动作调度给 S1 后,真正的逻辑是在 S1 上执行,所以没有必须声明两个 subscribeOn 调度器。

2. observeOn

observeOn 是在调用 onNext 时触发。

observeOn 的实现是在 ObservableObserveOn 类里,使用内部类 ObserveOnObserver 作为上游的订阅者、下游的消息来源、即主题。ObserveOnObserver 引入一个队列作为线程协调机制,前面的事件处理线程通知 ObserveOnObserver 时,把事件放入队列,触发调度器执行,然后返回;调度器执行时,不断从队列取出事件进行处理、直至所有事件处理完成。

更多的实现细节可以看源码。


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据