RxJava 线程模型

本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。

RxJava 的线程模型如下:

1. 不指定线程的情况

  • 不指定线程也就是不使用 observeOnsubscribeOn,所有操作在调用 subscribe 的线程执行。
@Test
public void noThread() {
    buildObservable().subscribe();
}

上面代码的输出为:

Thread[main]   execute   Action start emmit
Thread[main]   execute   Operation-1, event: 1
Thread[main]   execute   Operation-2, event: 1

2. subscribeOn

  • subscribeOn 不管调用多少次,只以第一次为准。如果只使用了 subscribeOn、没有使用 observeOn,则所有操作在第一次调用生成的线程里执行。
@Test
public void subscribeOn() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> observable = buildObservable();
    observable
        .subscribeOn(scheduler("subscribeOn-1"))
        .subscribeOn(scheduler("subscribeOn-2"))
        .subscribe(i -> {
            showMessageWithThreadName("Action subscribe");
            latch.countDown();
        });

    latch.await();
}

上面代码的输出为:

create scheduler subscribeOn-2
create scheduler subscribeOn-1
Thread[subscribeOn-1]   execute   Action start emmit
Thread[subscribeOn-1]   execute   Operation-1, event: 1
Thread[subscribeOn-1]   execute   Operation-2, event: 1
Thread[subscribeOn-1]   execute   Action subscribe

3. observeOn

  • observeOn 必须跟 subscribeOn 一起使用,单独使用会抛出空引用异常。
  • observeOn 应在 subscribeOn 的后面调用,否则会出现死锁的情况。
  • observeOn 操作会更改后续操作的执行线程,直至下一个 observeOn 调用之前的操作或 subscribe 操作。

继续阅读

RxJava

ReactiveX

ReactiveX 是一个用于异步编程的 API 规范。 ReactiveX 结合了 Observer 模式、Iterator 模式和函数式编程的最佳理念。

ReactiveX 带来了更好的代码基础:

  • Functional, 函数式:避免了复杂的有状态的程序,在可观察流上使用干净(无副作用)的 输入/输出 函数。
  • Less is more, 少即是多:ReactiveX 的操作子通常把精心制作的修改简化为几行代码。
  • Async error handling, 异步错误处理:传统的 try/catch 对于异步计算的错误非常乏力,但 ReactiveX 具有恰当的机制来处理错误。
  • Concurrency made easy, 更容易的并发:ReactiveX 的 Observables 和 Schedulers 允许程序员从底层的线程、同步和并发问题中抽象出来。

RxJava

RxJava 是 ReactiveX 在 Java 编程语言里的一个实现。

基本概念:

  • 事件:主题生成的、订阅者感兴趣的东西。
  • 订阅者:Observer,抽象基类是 Subscriber
  • 主题:被观察的对象,抽象基类是 Observable。每个主题都有一个 OnSubscribe 的实例,OnSubscribe 从类名看是对订阅行为的反应,其 call(Subscriber subscriber) 方法封装了事件发生、通知的逻辑,供每次订阅时调用。
  • 订阅:subscribe,是一种动作,RxJava 在订阅时建立主题与监听者的关系,每次订阅,主题都会调用其内部 OnSubscribe.call(Subscriber subscriber) 方法。

  • 对于 Observable.doOnNext/doOnCompleted/doOnError/doOnEach/map 这类中间操作,生成一个新的订阅者 Subscriber,封装了相关行为,用于添加新的逻辑,并代理了对之前订阅者的调用;用新的订阅者和当前主题创建新的主题并返回。(采用的是包装器模式)
    继续阅读

应用事务管理混乱导致的一个坑

Spring 的事务传播属性

org.springframework.transaction.annotation.Propagation 定义了 Spring 的事务传播属性:

  • REQUIRED: 支持当前事务,如果不存在则新建一个。

  • REQUIRES_NEW: 创建新的事务,如果当前存在一个则 suppend 当前的。

  • SUPPORTS: 支持当前事务,如果不存在则以非事务方式执行。

  • MANDATORY: 支持当前事务,如果不存在则抛出异常。

  • NOT_SUPPORTED: 以非事务方式执行,如果当前存在一个事务则 suppend 当前的。

  • NEVER: 以非事务方式执行,如果存在事务则抛出异常。

  • NESTED: 如果当前存在一个事务则以嵌套事务的方式执行。

一个生产问题

一开始是 DBA 反馈数据库出现两种现象:

  1. 出现一些操作做完但会话一直还在等待客户端的提交动作。
  2. 偶尔出现大量的行锁,导致 JVM 线程互相等待而假死。

有个获取流水号的方法 systemService.getSerialNo 的事务传播属性是 NOT_SUPPORTED 的,这个方法通过类似这样的 update t_serialno set serial_no = :newSerialNo where serial_key = :key and serial_no = :oldSerialNo SQL 语句进行更新,更新返回的受影响行数等于 1 认为新的流水号是不重复的,更新不成功则重试。

行锁就出现在这些流水号的更新上。

继续阅读

微热山丘,探索 IoC、AOP 实现原理(二) AOP 实现原理

AOP 实现原理

一、简介

AOP 是 Aspect Oriented Programming 的简写,面向切面编程。主要的作用是以一种统一的方式对程序的逻辑进行修改、增强处理。可以在编译时也可以在运行时实现。编译时处理一般是通过字节码处理技术,运行时进行的一般是通过动态代理技术实现。

二、AOP 核心概念

  • Concerns, 关注:有两类,核心关注–是关于业务逻辑的;横切关注–是一些通用的逻辑,比如日志、缓存。
  • Joinpoint, 连接点:是执行时的切入点。可以是字段访问也可以是被调用方法。基于动态代理技术实现的一般只支持方法调用的连接点。
  • Target, 目标:是一个被切入的地方,一般是一个业务逻辑。比如是对一个业务逻辑的方法的调用。
  • Pointcut, 切入点:并不是所有的连接点都需要切入,切入点用于指定哪些连接点需要切入。
  • Advice, 建议:定义了 Aspect 的任务和什么时候执行它,是在核心关注之前还是之后。
  • Aspect, 方面:Advice 和 pointcut 定义一个方面。Advice 定义 Aspect 的任务和什么时候执行它,而切入点 pointcut 定义在哪里具体地方切入。就是说 Aspect 定义了它是什么东西、什么时候切入和在哪里切入。
  • Weaving, 织入:织入是一个把横切方面混合到业务目标对象的过程。可以是在编译时间,也可以是在运行时使用类加载机制,Spring AOP 是在运行时生成 bean 时处理。

下面是在 Sping 的 XML 里定义一个 AOP 的配置,注意其中个元素间的关系:

<!-- 一个 weaving 的定义 -->
<aop:config>
    <!-- 定义 aspect, ref 指向 任务的定义 -->
    <aop:aspect id="aspectCommonLogHandler" ref="commonLogHandler">
        <!-- 定义 pointcut -->
        <aop:pointcut id="commonLogPointcut" expression="execution( * net.coderbee.*.controller..*.*(..))" />

        <!-- 定义 advice, around 表示在目标的前/后执行 -->
        <aop:around method="inceptor" pointcut-ref="commonLogPointcut" />
    </aop:aspect>
</aop:config>

三、AOP 实现

1. 各组件定义配置

warnhill 的 AOP 实现没有采用 Spring 那样的配置方式,而是采用 bean 定义的形式组织起来:

<!-- 定义织入逻辑的实现 -->
<bean id="aspectJAutoProxyCreator" class="net.coderbee.warmhill.aop.AspectJAutoProxyCreator" />

<!-- 定义 pointcut -->
<bean id="pointcut" class="net.coderbee.warmhill.aop.AspectJExpressionPointcut" >
    <property name="expression" value="execution(* net.coderbee..*.*(..))" />
</bean>

<!-- 定义 任务 -->
<bean id="timerInterceptor" class="net.coderbee.warmhill.aop.TimerInterceptor" />
<bean id="anotherInterceptor" class="net.coderbee.warmhill.aop.AnotherInterceptor" />

<!-- 定义 aspect -->
<bean id="testAdvisor" class="net.coderbee.warmhill.aop.AspectJExpressionPointcutAdvisor">
    <property name="advice" ref="timerInterceptor" />
    <property name="pointcut" ref="pointcut" />
</bean>
<bean id="testAdvisor2" class="net.coderbee.warmhill.aop.AspectJExpressionPointcutAdvisor">
    <property name="advice" ref="anotherInterceptor" />
    <property name="pointcut" ref="pointcut" />
</bean>

继续阅读

微热山丘,探索 IoC、AOP 实现原理(一) IoC 实现原理

一. 简介及项目设定

1.1 微热山丘 介绍

warmhill(微热山丘)是一个参考 Spring 实现 IoC、AOP 特性的小项目。

相比于 Spring 庞杂的分类、层层继承、抽象,warmhill(微热山丘) 里都是简单直接的类、方法调用,核心在于简洁地展现实现原理。

1.2 项目设定及限制

有如下的限定:
1. 所有的 bean 都是单例。
2. bean 只有一个唯一的标识符 id,没有名字、别名。
3. 所有的 bean 都是立即初始化的,不支持延迟初始化。
4. 对于 BeanPostProcessor 的应用是基于声明的先后顺序。
5. 对于 AOP 的切入点,只支持对方法调用的拦截,不细分 Before/After/Around/Throw 等。
6. 对于 AOP 的配置也是基于 bean 定义的,不支持 <aop:config> 标签。
7. 目前只支持从 XML 方式配置 bean 。
8. bean 的属性目前只支持 String 类型和对其他 bean 的引用,只支持 setter 方法的依赖注入。

二. IoC、AOP 基本概念介绍

  • Resource: 资源。可存放在任意位置,只有一个方法: InputStream getStream();

  • BeanDefinition: bean 的定义信息。

  • BeanDefitionLoader: bean 定义加载类,只有一个方法: List<BeanDefinition> load();

  • BeanFactory: bean 工厂,负责根据 bean 定义创建 bean 的实例。

继续阅读

Spring 事务管理的一个 trick

问题

最近有同事碰到这个异常信息: Transaction rolled back because it has been marked as rollback-only ,异常栈被吃了,没打印出来。

调用代码大概如下:

@Component
public class InnerService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional(rollbackFor = Throwable.class)
    public void innerTx(boolean ex) {
        jdbcTemplate.execute("insert into t_user(uname, age) values('liuwhb', 31)");
        if (ex) {
            throw new NullPointerException();
        }
    }

}

@Component
public class OutterService {
    @Autowired
    private InnerService innerService;

    @Transactional(rollbackFor = Throwable.class)
    public void outTx(boolean ex) {
        try {
            innerService.innerTx(ex);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

outterService.outTx(true);

他期望的是 innerService.innerTx(ex); 调用即使失败了也不会影响 OutterService.outTx 方法上的事务,只回滚了 innerTx 的操作。

结果没有得到他想要的,调用 OutterService.outTx 的外围方法捕获到了异常,异常信息是 Transaction rolled back because it has been marked as rollback-onlyoutTx 的其他操作也没有提交事务。

分析

上述方法的事务传播机制的默认的,也就是 Propagation.REQUIRED,如果当前已有事务就加入当前事务,没有就新建一个事务。

事务性的方法 outTx 调用了另一个事务性的方法 innerTx 。调用方对被调用的事务方法进行异常捕获,目的是希望被调用方的异常不会影响调用方的事务。

但还是会影响调用方的行为的。Spring 捕获到被调用事务方法的异常后,会把事务标记为 read-only,然后调用方提交事务的时候发现事务是只读的,就会抛出上面的异常。

继续阅读

Feign

简介

Feign 是一个 Java 到 HTTP 客户端的粘合剂。Feign 的目标是以最少的开销和代码把 你的代码连接到 http api 上。通过定制的编解码器和错误处理,你可以请求任何基于文本的 http api 。

原理

通过处理注解信息来生成模板化请求。在发出请求前,参数直接应用到这些模板上。这限制了 Feign 只支持基于文本的 api,这显著简化了系统的一些方面如重放请求。

为什么选择 Feign

  • 依赖问题。目前项目组用的是 Hessian 做远程调用,由于 Hessian 存在对 jar 包的依赖,特别是一些项目升级到 JDK 1.8,采用 Maven 构建;而老的一些任然采用 1.6 ,是个简单的 Eclipse 工程,导致打包、部署非常麻烦。

  • 依赖于接口,使用简洁。对于客户端,只依赖于接口类,通过 Spring 注入实现,迁移基本不需要很大的改动。

  • 与 Spring Cloud 集成。Feign 本身是 Spring Cloud 的一个组件,可以通过 Ribbon 做路由,可以进一步提升服务化。

  • 系统对性能的要求并不是那种很严苛的,基于文本的调用也方便调试。

继续阅读

分布式系统间请求跟踪

一、请求跟踪基本原理

现在的很多应用都是由很多系统系统在支持,这些系统还会部署很多个实例,用户的一个请求可能在多个系统的部署实例之间流转。为了跟踪一个请求的完整处理过程,我们可以给请求分配一个唯一的 ID traceID,当请求调用到另一个系统时,我们传递这个 traceID。在输出日志时,把这个 traceID 也输出到日志里,这样,根据日志文件,提取出现这个 traceID 的日志就可以分析这个请求的完整调用过程,甚至进行性能分析。

当然,在一个系统内部,我们不希望每次调用一个方法时都要传递这个 traceID,因此在 Java 里,一般把这个 traceID 放到某种形式的 ThreadLocal 变量里。

日志类库在输出日志时,就从这个 ThreadLocal 变量里取出 traceID,跟要输出的日志信息一起写入日志文件。

这样对于应用的开发者来说,基本不需要关注这个 traceID

二、远程调用间传递跟踪信息

如何使用的是自定义的 RPC 实现,这些 RPC 一般都预留了扩展机制来传递一些自定义的信息,我们可以把 traceID 作为自定义信息进行传递。

对于 Hessian 这种基于 HTTP 协议的 RPC 方法,它把序列化后的调用信息作为 POST 的请求体。如果我们不想去修改 Hessian 的调用机制,可以把 traceID 放到 HTTP 的请求头里。

在客户端只需要提供封装好的 RPC 调用代理。

在服务端通过 Filter 得到 traceID 放入 ThreadLocal 变量。

继续阅读

《Netty in action》 第三章 Netty 组件和设计

第三章 Netty 组件和设计

从高层视角,Netty address 两个等价的关注域:技术和架构
* 首先,它是构建于 Java NIO 上的异步、事件驱动的实现,保证了在高负载下最大的应用性能和伸缩性;
* Netty 体现了一组设计模式,从网络层解耦应用逻辑,简化开发,最大化可测试性、模块化、代码重用。

当我们更细微地学习 Netty 的独立组件时,我们将聚焦于它们是如何协作来支持这些架构最佳实践。通过遵循同样的原则,我们将获得 Netty 能提供的所有好处。

3.1 Channel, EventLoop, ChannelFuture

Channel, EventLoop, ChannelFuture 放到一起,可以代表了 Netty 的网络层抽象:

  • Channel: Sockets
  • EventLoop: 控制流,多线程,并发
  • ChannelFuture: 异步通知

3.1.1 Channel 接口

基本的 I/O 操作(bind(), connect(), read(), write()) 依赖于底层网络传输的提供的原子操作。在基于 Java 的网络里,基础结构是 Socket 类。Netty 的 Channel 接口提供的 API 极大简化了直接操作 Sockets 的复杂工作。

3.1.2 EventLoop 接口

EventLoop 定义了 Netty 处理一个连接生命周期里发生的事件的核心抽象。Channel、EventLoop、EventLoopGroup 之间的关系如下:

  • 一个 EventLoopGroup 包含一个或多个 EventLoop;
  • 一个 EventLoop 在它的生命周期里是绑定到单一线程的;
  • 一个 EventLoop 处理的所有 I/O 事件都是在它的专用线程上进行的;
  • 一个 Channel 在它的生命周期里是注册到到单一的 EventLoop;
  • 单一的 EventLoop 可能被赋给一个或多个 Channel。

注意,在这种设计里,给定 Channel 的 I/O 事件都由同一个线程来执行,事实上消除了对同步的需要。

继续阅读

《程序员必读之软件架构》–笔记

架构是什么

架构作为名词解释时,概括起来都与结构有关:将产品分解为一序列组件、模块和交互。

架构作为动词来解释时,包括了理解你需要构建什么、设定愿景以便进行构建和做出恰当的设计决策。所有这些都以需求为基础,因为需求驱动架构。关键在于,架构是关于交流愿景以及引入技术领导力的,这样参与构建产品的每个人都能理解这个愿景,并为产品的成功做出积极贡献。

不论何种领域的架构,其实主要就是结构和愿景。

架构分类

应用程序架构的关注点是应用程序,通常包括将应用程序结构为类和组件,确保设计模式正确应用,构建或使用框架,等等。本质上,应用程序架构谈论的是软件设计的低级别切面,通常只考虑单一的技术栈。应用程序架构着重考虑软件和代码组织。

系统架构是更大规模的应用程序架构。大多数软件系统实际上是由横跨不同层次和技术的多个应用程序组成。系统架构还关注互操作性和与环境中其他系统的集成。系统架构描述为从组件和服务到子系统等高层次的抽象。系统架构的定义大多数还包括了软件和硬件。

软件架构就是应用程序和系统架构的结合。从代码结构和基础到将代码成功部署到生产环境,与一个软件系统重要元素相关的所有东西就是软件架构。

企业架构一般是指整个组织的中心工作,着眼于如何组织和利用人员、流程和技术来使企业有效和高效地工作。它是关于企业如何分成组或部门,业务流程如何在这上层运作,以及技术如何支撑这一切。

好的架构带来敏捷。

软件架构角色

架构 VS. 设计

作为名词,设计是指一个系统内命名的结构或行为,解决或有助于月解决该系统的一个或多个问题。因而,设计代表了潜在的决策空间中的一个点。

所有架构都是设计,但并非所有设计都是架构。

架构反映了使一个系统成型的重要设计决策,而重要性则通过改变的成本来衡量。

尽管“重要决策”没法彻底消失,但能通过架构分层等多种策略来改变。软件系统架构流程的一部分就是搞清楚哪些是重要的及为什么。

思考软件架构的好处

  • 让团队跟随一个清晰的愿景和路线图,无论这个愿景是一个人所有还是整个团队共有;
  • 技术领导力和更好的协调;
  • 与人交流的刺激因素,以便回答与重要决策、非功能需求、限制和其他横切关注点相关的问题;
  • 识别或减轻风险的框架;
  • 方法和标准的一致性,随之而来的结构良好的代码库;
  • 正在构建的产品的坚实基础;
  • 对不同的听众,以不同层次的抽象来交流解决方案的结构。

C4:语境、容器、组件和类

c4: 语境 容器 组件 类
软件的静态视图:

  • 语境:设定场景的高层次图,包括关键的系统依赖和参与者。

    语境图帮助回答下面的问题:

    1. 我们构建的(或已经构建的)软件系统是什么?
    2. 谁会用它?
    3. 如何融入已有的 IT 系统?
  • 容器:荣企图显示了高层次的技术选择,容器如何分担职责、如何通信。

    1. 软件系统的整体形态是什么样的?
    2. 高层次技术决策有哪些?
    3. 职责在系统中如何分布?
    4. 容器之间如何相互交流?
    5. 为了实现特性,作为一个开发者,我需要在哪里写代码?
  • 组件:组件图可以让你看到每个容器的关键逻辑组件及之间的关系。

    1. 系统由哪些组件/服务组成?
    2. 在高层次上,系统如何工作是否清晰?
    3. 所有组件/服务都有一个家吗(即驻留在一个容器中)?
  • 类:可选的细节层次。

原则

约束通常是强加于你的,而原则是你为了将一致性和清晰度引入最终代码库而想采用的原则(如编码规范、自动化测试等)或架构的原则(如分层策略,架构模式等)。

%e5%85%b3%e6%b3%a8%e7%82%b9


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。