FutureTask (JDK1.8)

本文基于 JDK 1.8.0_101 对应的源码。

FutureTask 表示一个可取消的异步计算的任务,其构造器接收一个 Runnable/Callable 表示异步计算的任务。虽然说 FutureTask 支持取消,但任务一旦开始执行,执行过程就可能不受取消操作的影响。

当任务没有执行结束时,所有获取结果的操作都会阻塞等待。

以栈的方式维护等待获取结果的线程,因为不用考虑公平性的问题,用栈更简单。

FutureTask 在 JDK 1.7 的时候是基于 AQS 实现的,可以看这里

总体上:

  • volatile int state 控制执行状态,
  • volatile Thread runner 防止重复执行,
  • volatile WaitNode waiters 以栈形式维护等待线程。

继续阅读

再读 AQS

基于 JDK 1.8.0_101 。

AQS 的核心:两种模式、一个状态、一个同步等待队列、条件对象。

  • 两种模式:是指支持独占模式或共享模式,两者不能并存,要么独占、要么共享。
  • 一个状态:状态是 AQS 的灵魂,根据所处的模式来定义其值的含义。
  • 一个同步等待队列:是线程为进入指定的模式、但还不能进入时,需要在同步等待队列上进行等待。该队列是双向链表,以方便支持取消操作。
  • 条件对象:只能在独占模式下使用,线程可以在条件对象上进行等待、由其他线程在满足条件时唤醒。
    > 每个条件对象其实是一个单向链表实现的条件等待队列,在给定条件对象上等待的线程会进入条件等待队列,等待其他线程来唤醒。
    > 进入等待之前,线程会先进入条件等待队列、保存自己持有时的状态值,然后释放独占模式,进入挂起线程、等待唤醒,唤醒后重新尝试进入独占模式,进入后再继续执行。

继续阅读

JUC 并发 Queue 设计与介绍

Queue 体系

Queue 是一种先进先出的队列。

ArrayBlockingQueue 和 LinkedBlockingQueue 是带阻塞特性,基于锁来实现。ArrayBlockingQueue 采用同一把锁来控制出、入队列操作;LinkedBlockingQueue 用两把锁来分别控制出、入队列操作,提高了并发性能。

ConcurrentLinkedQueue 非阻塞,采用无锁算法、利用 CAS 操作来实现。

0.1 BlockingQueue

当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素、但队列为空时,消费者会被阻塞。

其实现类必须是线程安全,入队列 happen-before 出队列。

0.2 TransferQueue

继承自 BlockingQueue,更进一步:生产者会一直阻塞直到添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列)。

特别适用于这种应用间传递消息的场景:生产者有时需要等待消费者接收消息,有时只需把消息放进队列、不需要等待消费者接收。

// 传递元素给消费者,如果需要则等待。确保一次传递完成。
void transfer(E e);

// 非阻塞
boolean tryTransfer(E e);

// 基于等待时间的。
boolean tryTransfer(E e, long timeout, TimeUnit unit);

// 返回是否有在等待接收元素的消费者
// (BlockingQueue.take()或带等待时间的 poll 方法调用)
boolean hasWaitingConsumer();

// 返回大概的在等待接收元素的消费者
//(BlockingQueue.take()或带等待时间的 poll 方法调用)
int getWaitingConsumerCount();

0.3 Deque

允许在两端进行插入、删除元素的线性集合。

实现类:

  • ArrayDeque:基于数组加头尾两个指针来实现、非线程安全的。
  • LinkedList:基于双向链表实现、非线程安全的。
  • ConcurrentLinkedDeque:基于双向链表、CAS 元语实现、无界的。

0.4 BlockingDeque

当 Deque 里没有元素时阻塞消费者,当没有空闲空间时阻塞生产者。

目前只有一个实现类 LinkedBlockingDeque,使用双向链表来存储元素,支持容量限制,用一把锁来保证线程安全性。因为允许在两端进行操作,双向链表更合适。

继续阅读

Java8 StampedLock

概述

StampedLock 是基于能力的锁,用三种模式来控制读/写访问。StampedLock 的状态包含了 版本和模式。锁获取方法根据锁的状态返回一个表示和控制访问的标志(stamp),“try”版本的这些方法可能返回一个特殊的值 0 来表示获取失败。锁释放和它的变换方法要求一个标志作为参数,如果它们不符合锁的状态就失败。这三种模式是:

  • 写:方法 writeLock 可能阻塞等待独占访问,返回一个标志,可用在方法 unlockWrite 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。当锁以写模式持有时,没有读锁可以获取,所有乐观性读确认将失败。

  • 读:方法 readLock 可能为非独占访问而阻塞等待,返回一个标志用于方法 unlockRead 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。

  • 乐观读:只有在锁当前没有以写模式持有时,方法 tryOptimisticRead 返回一个非 0 标志。如果锁自给定标志以来没有以写模式持有,方法 validate 返回 true 。这种模式可以认为是一种极弱版本的读锁,可以在任意时间被写者打破。在短的只读代码段使用乐观模式常常可以减少竞争和提升吞吐量。然而,它的使用天生是脆弱的。乐观读片段section应该只读字段并持有到本地变量,用于以后使用,在确认以后。乐观读模式里的字段读取可能很不一致,所以惯例只用于当你对数据表示足够熟悉,可以检查一致性和/或重复调用 validate() 方法。例如,这些步骤典型地在第一次读取对象或数组引用,然后访问其中字段、元素或方法时要求。

继续阅读

JUC Exchanger

一、概述

Exchanger 可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

二、算法描述

基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。

这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena, 竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:

继续阅读

JUC Semaphore 信号量

概要

一个计数信号量,维护了一组许可。acquire() 方法在许可可用前将阻塞,许可可用时获取,每个release() 方法添加一个许可,潜在地释放一个阻塞的获取线程。

信号量常用于约束访问一些(物理或逻辑)资源的线程数量。

信号量初始化为 1 可以用作互斥锁,更常见的是称为二进制信号量,因为它只有两个状态:有一个许可可用,或没有许可可用。当以这种方式使用时,二进制信号量有个属性,“锁”可以被其他线程而不是属主线程(信号量没有记录属主关系)释放。

公平性属性:公平策略,按FIFO顺序分配许可;非公平策略,允许闯入,即 acquire 调用线程获取许可,而不是已经在等待的线程。

一般地,信号量用于控制资源的访问应当初始化为公平的,来保证没有线程因为不能访问资源而饥饿。当把信号量用于其他同步类型的控制,非公平顺序的吞吐量优势压过公平的考量。

内存一致性效果:线程调用 “release” 之前的动作 happens-before 于成功调用 “acquire” 的其他线程的后续动作。

继续阅读

JUC CyclicBarrier 可重用屏障

介绍

CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的。

CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。

在释放所有线程后,CyclicBarrier 可以通过重置状态来重用,这也是 Cyclic 的来源。

继续阅读

Future 与 FutureTask

Future

来自 Java DOC 文档:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

也就是说Future具有这样的特性:

  • 异步执行,可用 get 方法获取执行结果。
  • 如果计算还没完成,get 方法是会阻塞的,如果完成了,是可以多次获取并立即得到结果的。
  • 如果计算还没完成,是可以取消计算的。
  • 可以查询计算的执行状态。

埋两个小问题用于设想下怎么实现Future:

  1. Future在计算完成前阻塞 get 访问,完成后可以自由访问,如何实现 get 方法?
  2. 计算的取消是怎么实现的?被取消的计算会终止执行吗?

继续阅读

JUC 可重入 读写锁 ReentrantReadWriteLock

读写锁 ReadWriteLock

读写锁维护了一对相关的锁,一个用于只读操作,一个用于写入操作。只要没有writer,读取锁可以由多个reader线程同时保持。写入锁是独占的。

互斥锁一次只允许一个线程访问共享数据,哪怕进行的是只读操作;读写锁允许对共享数据进行更高级别的并发访问:对于写操作,一次只有一个线程(write线程)可以修改共享数据,对于读操作,允许任意数量的线程同时进行读取。

与互斥锁相比,使用读写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,以及数据的争用——即在同一时间试图对该数据执行读取或写入操作的线程数。

读写锁适用于读多写少的情况。

可重入读写锁 ReentrantReadWriteLock

属性

ReentrantReadWriteLock 也是基于 AbstractQueuedSynchronizer 实现的,它具有下面这些属性(来自Java doc文档):

  • 获取顺序:此类不会将读取者优先或写入者优先强加给锁访问的排序。
    • 非公平模式(默认):连续竞争的非公平锁可能无限期地推迟一个或多个reader或writer线程,但吞吐量通常要高于公平锁。
    • 公平模式:线程利用一个近似到达顺序的策略来争夺进入。当释放当前保持的锁时,可以为等待时间最长的单个writer线程分配写入锁,如果有一组等待时间大于所有正在等待的writer线程的reader,将为该组分配读者锁。
    • 试图获得公平写入锁的非重入的线程将会阻塞,除非读取锁和写入锁都自由(这意味着没有等待线程)。
  • 重入:此锁允许reader和writer按照 ReentrantLock 的样式重新获取读取锁或写入锁。在写入线程保持的所有写入锁都已经释放后,才允许重入reader使用读取锁。
    writer可以获取读取锁,但reader不能获取写入锁。
  • 锁降级:重入还允许从写入锁降级为读取锁,实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。

  • 锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。

  • Condition 支持:写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。
    读取锁不支持 ConditionreadLock().newCondition() 会抛出 UnsupportedOperationException

  • 监测:此类支持一些确定是读取锁还是写入锁的方法。这些方法设计用于监视系统状态,而不是同步控制。

继续阅读

JUC ConcurrentLinkedQueue

java.util.concurrent.ConcurrentLinkedQueue 是一个基于链接结点的、无界、线程安全的、FIFO队列。它的是实现采用了无等待(wait-free)、无锁(lock-free)算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。

一、设计思路

  1. 使用 CAS 原子指令来处理对数据的并发访问,把同步最小化到单个硬件指令上;这是无锁算法的基础。
  2. 分别用 head、tail 两个原子指针来指向队头和队尾,可以同时进行入队和出队操作,但允许 head、tail 并不总是指向有效的头和尾;把入队、出队需要同步更新的范围最小化到单个原子变量上,这是无锁算法实现的关键。
  3. 在入队、出队操作上并不总是更新 head、tail,而是在多次操作后才进行更新,节省了CAS指令。具有批量更新的效果。
  4. 由于 head、tail 并不总是指向头和尾,这就是说,队列会出现不一致的情况,所以在 head、tail 上定义了一些不变式来维护算法的正确性。

二、不变式

不变式是在执行方法之前和之后,队列必须要保持的。可变式是在执行过程中,队列允许出现的不一致情况。

head 的约束

通过这个结点能够在 O(1) 时间到达第一个存活(非已删除)结点,如果有的话。

不变式
  • 所有存活结点可以 从 head 开始通过 succ() 访问。
  • head != null
  • (tmp = head).next != tmp || tmp != head,这个不变式是说 head 在方法开始之前、之后,head 指向的结点应该是在队列上的。
可变式
  • head.item 可能是、也可能不是 空的。
  • 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。

tail 的约束

从它可以在 O(1) 时间访问到队列的最后一个结点(唯一的满足 node.next == null 的结点)。这个也就是说 tail 指向的并不总是最后一个存活结点。

不变式
  • 最后的结点总是可以从 tail 开始通过 succ() 访问。
  • tail != null
可变式
  • tail.item 可能是、也可能不是空。
  • 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。
  • tail.next 可能是、也可能不是 自指向到 tail。
    继续阅读