Java8 StampedLock

概述

StampedLock 是基于能力的锁,用三种模式来控制读/写访问。StampedLock 的状态包含了 版本和模式。锁获取方法根据锁的状态返回一个表示和控制访问的标志(stamp),“try”版本的这些方法可能返回一个特殊的值 0 来表示获取失败。锁释放和它的变换方法要求一个标志作为参数,如果它们不符合锁的状态就失败。这三种模式是:

  • 写:方法 writeLock 可能阻塞等待独占访问,返回一个标志,可用在方法 unlockWrite 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。当锁以写模式持有时,没有读锁可以获取,所有乐观性读确认将失败。

  • 读:方法 readLock 可能为非独占访问而阻塞等待,返回一个标志用于方法 unlockRead 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。

  • 乐观读:只有在锁当前没有以写模式持有时,方法 tryOptimisticRead 返回一个非 0 标志。如果锁自给定标志以来没有以写模式持有,方法 validate 返回 true 。这种模式可以认为是一种极弱版本的读锁,可以在任意时间被写者打破。在短的只读代码段使用乐观模式常常可以减少竞争和提升吞吐量。然而,它的使用天生是脆弱的。乐观读片段section应该只读字段并持有到本地变量,用于以后使用,在确认以后。乐观读模式里的字段读取可能很不一致,所以惯例只用于当你对数据表示足够熟悉,可以检查一致性和/或重复调用 validate() 方法。例如,这些步骤典型地在第一次读取对象或数组引用,然后访问其中字段、元素或方法时要求。

继续阅读

JUC Exchanger

一、概述

Exchanger 可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

二、算法描述

基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。

这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena, 竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:

继续阅读

JUC Semaphore 信号量

概要

一个计数信号量,维护了一组许可。acquire() 方法在许可可用前将阻塞,许可可用时获取,每个release() 方法添加一个许可,潜在地释放一个阻塞的获取线程。

信号量常用于约束访问一些(物理或逻辑)资源的线程数量。

信号量初始化为 1 可以用作互斥锁,更常见的是称为二进制信号量,因为它只有两个状态:有一个许可可用,或没有许可可用。当以这种方式使用时,二进制信号量有个属性,“锁”可以被其他线程而不是属主线程(信号量没有记录属主关系)释放。

公平性属性:公平策略,按FIFO顺序分配许可;非公平策略,允许闯入,即 acquire 调用线程获取许可,而不是已经在等待的线程。

一般地,信号量用于控制资源的访问应当初始化为公平的,来保证没有线程因为不能访问资源而饥饿。当把信号量用于其他同步类型的控制,非公平顺序的吞吐量优势压过公平的考量。

内存一致性效果:线程调用 “release” 之前的动作 happens-before 于成功调用 “acquire” 的其他线程的后续动作。

继续阅读

JUC CyclicBarrier 可重用屏障

介绍

CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的。

CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。

在释放所有线程后,CyclicBarrier 可以通过重置状态来重用,这也是 Cyclic 的来源。

继续阅读

Future 与 FutureTask

Future

来自 Java DOC 文档:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

也就是说Future具有这样的特性:

  • 异步执行,可用 get 方法获取执行结果。
  • 如果计算还没完成,get 方法是会阻塞的,如果完成了,是可以多次获取并立即得到结果的。
  • 如果计算还没完成,是可以取消计算的。
  • 可以查询计算的执行状态。

埋两个小问题用于设想下怎么实现Future:

  1. Future在计算完成前阻塞 get 访问,完成后可以自由访问,如何实现 get 方法?
  2. 计算的取消是怎么实现的?被取消的计算会终止执行吗?

继续阅读

JUC 可重入 读写锁 ReentrantReadWriteLock

读写锁 ReadWriteLock

读写锁维护了一对相关的锁,一个用于只读操作,一个用于写入操作。只要没有writer,读取锁可以由多个reader线程同时保持。写入锁是独占的。

互斥锁一次只允许一个线程访问共享数据,哪怕进行的是只读操作;读写锁允许对共享数据进行更高级别的并发访问:对于写操作,一次只有一个线程(write线程)可以修改共享数据,对于读操作,允许任意数量的线程同时进行读取。

与互斥锁相比,使用读写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,以及数据的争用——即在同一时间试图对该数据执行读取或写入操作的线程数。

读写锁适用于读多写少的情况。

可重入读写锁 ReentrantReadWriteLock

属性

ReentrantReadWriteLock 也是基于 AbstractQueuedSynchronizer 实现的,它具有下面这些属性(来自Java doc文档):

  • 获取顺序:此类不会将读取者优先或写入者优先强加给锁访问的排序。
    • 非公平模式(默认):连续竞争的非公平锁可能无限期地推迟一个或多个reader或writer线程,但吞吐量通常要高于公平锁。
    • 公平模式:线程利用一个近似到达顺序的策略来争夺进入。当释放当前保持的锁时,可以为等待时间最长的单个writer线程分配写入锁,如果有一组等待时间大于所有正在等待的writer线程的reader,将为该组分配读者锁。
    • 试图获得公平写入锁的非重入的线程将会阻塞,除非读取锁和写入锁都自由(这意味着没有等待线程)。
  • 重入:此锁允许reader和writer按照 ReentrantLock 的样式重新获取读取锁或写入锁。在写入线程保持的所有写入锁都已经释放后,才允许重入reader使用读取锁。
    writer可以获取读取锁,但reader不能获取写入锁。
  • 锁降级:重入还允许从写入锁降级为读取锁,实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。

  • 锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。

  • Condition 支持:写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。
    读取锁不支持 ConditionreadLock().newCondition() 会抛出 UnsupportedOperationException

  • 监测:此类支持一些确定是读取锁还是写入锁的方法。这些方法设计用于监视系统状态,而不是同步控制。

继续阅读

JUC ConcurrentLinkedQueue

java.util.concurrent.ConcurrentLinkedQueue 是一个基于链接结点的、无界、线程安全的、FIFO队列。它的是实现采用了无等待(wait-free)、无锁(lock-free)算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。

一、设计思路

  1. 使用 CAS 原子指令来处理对数据的并发访问,把同步最小化到单个硬件指令上;这是无锁算法的基础。
  2. 分别用 head、tail 两个原子指针来指向队头和队尾,可以同时进行入队和出队操作,但允许 head、tail 并不总是指向有效的头和尾;把入队、出队需要同步更新的范围最小化到单个原子变量上,这是无锁算法实现的关键。
  3. 在入队、出队操作上并不总是更新 head、tail,而是在多次操作后才进行更新,节省了CAS指令。具有批量更新的效果。
  4. 由于 head、tail 并不总是指向头和尾,这就是说,队列会出现不一致的情况,所以在 head、tail 上定义了一些不变式来维护算法的正确性。

二、不变式

不变式是在执行方法之前和之后,队列必须要保持的。可变式是在执行过程中,队列允许出现的不一致情况。

head 的约束

通过这个结点能够在 O(1) 时间到达第一个存活(非已删除)结点,如果有的话。

不变式
  • 所有存活结点可以 从 head 开始通过 succ() 访问。
  • head != null
  • (tmp = head).next != tmp || tmp != head,这个不变式是说 head 在方法开始之前、之后,head 指向的结点应该是在队列上的。
可变式
  • head.item 可能是、也可能不是 空的。
  • 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。

tail 的约束

从它可以在 O(1) 时间访问到队列的最后一个结点(唯一的满足 node.next == null 的结点)。这个也就是说 tail 指向的并不总是最后一个存活结点。

不变式
  • 最后的结点总是可以从 tail 开始通过 succ() 访问。
  • tail != null
可变式
  • tail.item 可能是、也可能不是空。
  • 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。
  • tail.next 可能是、也可能不是 自指向到 tail。
    继续阅读

JUC LinkedBlockingQueue

java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。

队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它的容量范围是: 1 – Integer.MAX_VALUE。

由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。

头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
继续阅读

JUC ArrayBlockingQueue

java.util.concurrent.ArrayBlockingQueue 是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

此类基于 java.util.concurrent.locks.ReentrantLock 来实现线程安全,所以提供了 ReentrantLock 所能支持的公平性选择。

属性

队列的操作主要有读、写,所以用了两个 int 类型的属性作为下一个读写位置的的指针。存放元素的数组是 final 修饰的,所以数组的大小是固定的。对于并发控制,是所有的访问都必须加锁,并用两个条件对象用于协调读写操作。

// 队列存放元素的容器
final Object[] items;

// 下一次读取或移除的位置
int takeIndex;

// 存放下一个放入元素的位置
int putIndex;

// 队列里有效元素的数量
int count;


// 所有访问的保护锁
final ReentrantLock lock;

// 等待获取的条件
private final Condition notEmpty;

// 等待放入的条件
private final Condition notFull;

环绕处理

如果指针一直往前增加或一直往后减小,那么总会超出数组的有效索引范围。所以需要进行一些环绕处理。

// 指针前移
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

// 指针后移
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}

注意,上面的处理都是对指针值的直接处理,而不关心是读指针还是写指针,因为是否有可读元素、可写空间的判断是通过对 count 计数来判断的。

这也是 count 的作用,它极大地简化了指针有效性的判断。在下面的 insertextract 方法中根本就不需要对读写指针之间的位置关系进行判断,非常精妙。

通过环绕处理可以把这个数组看成是圆形的缓存。
继续阅读

JUC 源码分析 三 AbstractQueuedSynchronizer 共享模式 与 CountDownLatch

共享模式

共享模式允许一组线程获取同一个许可。为实现共享模式子类需要实现两个方法:

  • tryAcquireShared:返回int类型的值,小于0表示获取失败,等于0表示获取成功但不允许后续更多的获取,大于0表示获取成功且允许更多的后续获取。
  • tryReleaseShared:返回true表示释放许可成功,可以唤醒等待线程;false表示失败,不唤醒等待线程。

共享获取 acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
       // 添加到等待队列,不管是共享模式还是独占模式,都共享同一个等待队列。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); // 尝试获取,返回值表示是否允许获取
                if (r >= 0) {
                   // 获取成功
                   // 把自己设为头结点并传递可以获取的信号
                   // node 把自己设为头结点后,它的后继发现它的前驱是头结点了,就会尝试获取。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 尝试通知队列里的下一个结点,如果:
     *       调用者指示或者之前操作记录显示需要传递
     *       (注意:这里对waitStatus使用单一检查,因为PROPAGATE可能被转换到SIGNAL)
     *   并且
     *       下一个结点以共享模式等待或者我们根本就不知道,因为它是空的。
     *
     * 在这些检查有点保守,可能导致不必要的唤醒,但只是在多重竞争acquires/releases时,
     * 因此,大多数都是现在或不久就需要通知的。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null; // for GC
    node.prev = null;
}

继续阅读