JUC 并发 Queue 设计与介绍

Queue 体系

Queue 是一种先进先出的队列。

ArrayBlockingQueue 和 LinkedBlockingQueue 是带阻塞特性,基于锁来实现。ArrayBlockingQueue 采用同一把锁来控制出、入队列操作;LinkedBlockingQueue 用两把锁来分别控制出、入队列操作,提高了并发性能。

ConcurrentLinkedQueue 非阻塞,采用无锁算法、利用 CAS 操作来实现。

0.1 BlockingQueue

当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素、但队列为空时,消费者会被阻塞。

其实现类必须是线程安全,入队列 happen-before 出队列。

0.2 TransferQueue

继承自 BlockingQueue,更进一步:生产者会一直阻塞直到添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列)。

特别适用于这种应用间传递消息的场景:生产者有时需要等待消费者接收消息,有时只需把消息放进队列、不需要等待消费者接收。

// 传递元素给消费者,如果需要则等待。确保一次传递完成。
void transfer(E e);

// 非阻塞
boolean tryTransfer(E e);

// 基于等待时间的。
boolean tryTransfer(E e, long timeout, TimeUnit unit);

// 返回是否有在等待接收元素的消费者
// (BlockingQueue.take()或带等待时间的 poll 方法调用)
boolean hasWaitingConsumer();

// 返回大概的在等待接收元素的消费者
//(BlockingQueue.take()或带等待时间的 poll 方法调用)
int getWaitingConsumerCount();

0.3 Deque

允许在两端进行插入、删除元素的线性集合。

实现类:

  • ArrayDeque:基于数组加头尾两个指针来实现、非线程安全的。
  • LinkedList:基于双向链表实现、非线程安全的。
  • ConcurrentLinkedDeque:基于双向链表、CAS 元语实现、无界的。

0.4 BlockingDeque

当 Deque 里没有元素时阻塞消费者,当没有空闲空间时阻塞生产者。

目前只有一个实现类 LinkedBlockingDeque,使用双向链表来存储元素,支持容量限制,用一把锁来保证线程安全性。因为允许在两端进行操作,双向链表更合适。

继续阅读

JUC 延迟队列 DelayQueue

DelayQueue 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。基于 PriorityQueue 实现的延迟队列,用 ReentrantLock 提供线程安全性。

其元素必须实现 Delayed 接口。

该类可用来实现定时调度的功能,当前时间与任务的下次执行时间的距离作为延迟时间。

实现上采用 Leader_Follower 模式 的变体进行优化:leader 进行限时等待,其他线程作为 follower 无限等待。leader 在等待的过程中可能插入一个更快到期的元素,那么旧 leader 就会被作废,如果又有一个线程来获取,那么它会作为新的 leader 根据新的队列头元素进行限时等待。

继续阅读

ThreadLocal/InheritableThreadLocal 设计与源码分析

ThreadLocal 提供了线程本地的变量,每个线程只能通过 get/set 方法访问自己的变量。此类的实例通常声明为类的 private static 属性、用来把状态(比如事务ID)关联到线程上。

InheritableThreadLocal 扩展了 ThreadLocal,为子线程提供从父线程那里继承的值:在创建子线程时,子线程会接收所有可继承的线程局部变量的初始值,以获得父线程所具有的值。

1. 实现思路

如果自行实现一个 ThreadLocal,直接思路可能是:ThreadLocal 内维护一个 Map,以 线程对象为 key,value 为变量。

这个思路的问题有:
1. 当线程终止、JVM 进行垃圾回收时,这个 Map 还持有对线程的引用而没法回收线程的资源;如果 JVM 要能回收,那么必须知道有多少 ThreadLocal 实例持有对线程的引用,这会给 JVM 带来负担。
2. 为了实现 InheritableThreadLocal 时,在创建时还必须找出所有的 InheritableThreadLocal,判断父线程是否有设置变量,有的则进行拷贝变量。

从上述问题来看,实现线程本地变量至少应该考虑:
1. 线程本地变量不应该直接持有对 Thread 对象的引用,避免给 JVM 回收 Thread 带来额外的开销;
2. 为实现 InheritableThreadLocal,一个线程在哪些 InheritableThreadLocal 里设置了变量应该有个集中式的存储,这样才方便把父线程的可继承本地变量拷贝到子线程的。
3. 可能有多个线程同时对 ThreadLocal 进行设置变量,那么对 Map 的访问应当是线程安全的。

再来看下线程本地变量涉及哪些参与者:ThreadLocal 、Thread、变量值。

一个 ThreadLocal 可以持有多个 Thread 的变量,一个 Thread 也可以在多个 ThreadLocal 上设置变量。因此一个 (ThreadLocal, Thread) 的组合才能唯一确定一个线程本地变量值。

Map 只能放在 (ThreadLocal, Thread) 中的一个,前面也说了放在 ThreadLocal 上是不合适的。再来看看放在 Thread 上如何。

每个 Thread 的 Map 属性的 key 是 ThreadLocal 对象,value 是变量值,看来也能实现线程本地变量。

这样反转之后,ThreadLocal 不会持有线程的引用,线程回收不存在问题,线程的 Map 也可以在线程回收时进行回收,Map 里面保存的变量值也可以进行回收。

可继承的线程本地变量可以用另一个 Map 来维护,起到了集中存储的作用。

每个线程都只访问自己的 Map,自然没有并发的竞争。

完美!JDK 从 1.3 开始就是按这个思路去实现的。

继续阅读

基于数据库的乐观锁

涉及涉及数据库的高并发解决方案一般都会提到乐观锁。

乐观锁:在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。如果其他事务有更新的话,正在提交的事务会进行回滚。

利用数据库实现乐观锁的伪代码:

Connection conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
// step 1
int oldVersion = getOldVersion(stmt);

// step 2
// 用这个数据库连接做其他的逻辑

// step 3 可用预编译语句
int i = stmt.executeUpdate(
        "update optimistic_lock set version = " + (oldVersion + 1) + " where version = " + oldVersion);

// step 4
if (i > 0) {
    conn.commit(); // 更新成功表明数据没有被修改,提交事务。
} else {
    conn.rollback(); // 更新失败,数据被修改,回滚。
}

注意事项:

  • 前面伪代码的 step 3 和 4 应在存储过程里完成,防止 step 3 完成后,由于 GC 或网络阻塞导致 step 4 延迟执行,此时 乐观锁的记录被数据库锁定,其他请求要进行更新只能等待,被阻塞。
    可以用存储过程来替代后面的两步,或者整个逻辑都用存储过程实现:
delimiter $$

create procedure sp_update_version (newVersion int, oldVersion int) begin
    update optimistic_lock t set t.version = newVersion where t.version = oldVersion;

    if row_count() > 0 then
        commit;
    else
        rollback;
    end if;
end $$

乐观锁的缺点:

  • 会带来大数量的无效更新请求、事务回滚,给DB造成不必要的额外压力。
  • 无法保证先到先得,后面的请求可能由于并发压力小了反而有可能处理成功。

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

基于Redis实现分布式锁

用单实例的正确实现

命令:set resource_name my_random_value NX PX timeout_millis,在 key resource_name 不存在时(NX选项的作用)设置这个key 的值为 my_random_value、超时时间设为 timeout_millisPX选项的作用)。

删除key的时候用 Lua 脚本来检测key的值是否为 my_random_value,是才允许删除;需要保证这个值在所有客户端里唯一;(借助 Lua 实现一个 CAS 操作)

一些注意点与问题

  • 一个分布式锁必须设置超时时间,否则一旦某个客户端获取锁成功后与 Redis 无法通信,那么它会一直持有这个锁,其他客户端无法获得锁。
    这个过期时间叫做锁的有效时间(lock validity time),客户端必须在这个时间内完成对资源的访问操作。

  • 设置key与超时时间必须在一个命令里完成。如果客户端在设置了key后、设置超时时间前崩溃,那么它会一直持有这个锁。

  • 释放锁时必须检查key对应的值是否与自己持有的一致(通过 Lua 实现CAS),防止误删其他客户端持有的锁。防止出现:客户端A准备释放持有的锁时,检测到值与自己持有的一致,然后由于某种原因被长时间阻塞,锁的超时时间到达后客户端B获取了锁,此时客户端A执行不检查key值的删除操作就会破坏了锁。

继续阅读

《Java 虚拟机并发编程》笔记

并发

线程数 = CPU可用核心数 / ( 1 – 阻塞系数 )
阻塞系数的取值在 0 - 1 之间,计算密集型任务的阻塞系数是 0,IO 密集型任务的阻塞系数接近于 1。

构建计算密集型并发应用程序的几点经验:
* 子任务的划分数不少于处理器核心数;
* 线程数多于处理器核心数对性能提升毫无帮助;
* 在子任务划分超过一定数量之后,再增加子任务划分数对于性能的提升将十分有限。

保持一个合理的划分数,并使所有处理器核心都有足够的工作量才是关键。

应该尽可能地提供共享不可变性,否则就应该遵循隔离可变性原则,即保证总是只有一个线程访问可变变量。

开多少个线程以及如何拆分问题都会影响到你的并发应用程序的性能,还要权衡每个子任务的工作负载和划分开销。

继续阅读

Java8 StampedLock

概述

StampedLock 是基于能力的锁,用三种模式来控制读/写访问。StampedLock 的状态包含了 版本和模式。锁获取方法根据锁的状态返回一个表示和控制访问的标志(stamp),“try”版本的这些方法可能返回一个特殊的值 0 来表示获取失败。锁释放和它的变换方法要求一个标志作为参数,如果它们不符合锁的状态就失败。这三种模式是:

  • 写:方法 writeLock 可能阻塞等待独占访问,返回一个标志,可用在方法 unlockWrite 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。当锁以写模式持有时,没有读锁可以获取,所有乐观性读确认将失败。

  • 读:方法 readLock 可能为非独占访问而阻塞等待,返回一个标志用于方法 unlockRead 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。

  • 乐观读:只有在锁当前没有以写模式持有时,方法 tryOptimisticRead 返回一个非 0 标志。如果锁自给定标志以来没有以写模式持有,方法 validate 返回 true 。这种模式可以认为是一种极弱版本的读锁,可以在任意时间被写者打破。在短的只读代码段使用乐观模式常常可以减少竞争和提升吞吐量。然而,它的使用天生是脆弱的。乐观读片段section应该只读字段并持有到本地变量,用于以后使用,在确认以后。乐观读模式里的字段读取可能很不一致,所以惯例只用于当你对数据表示足够熟悉,可以检查一致性和/或重复调用 validate() 方法。例如,这些步骤典型地在第一次读取对象或数组引用,然后访问其中字段、元素或方法时要求。

继续阅读

Java8 Striped64 和 LongAdder

数据 striping

根据维基百科的这段说明

In computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that consecutive segments are stored on different physical storage devices.

Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage, network interface controllers, different computers in clustered file systems and grid-oriented storage, and RAM in some systems.

数据 striping 就是把逻辑上连续的数据分为多个段,使这一序列的段存储在不同的物理设备上。通过把段分散到多个设备上可以增加访问并发性,从而提升总体的吞吐量。

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

继续阅读

JUC Exchanger

一、概述

Exchanger 可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

二、算法描述

基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。

这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena, 竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:

继续阅读

JUC Semaphore 信号量

概要

一个计数信号量,维护了一组许可。acquire() 方法在许可可用前将阻塞,许可可用时获取,每个release() 方法添加一个许可,潜在地释放一个阻塞的获取线程。

信号量常用于约束访问一些(物理或逻辑)资源的线程数量。

信号量初始化为 1 可以用作互斥锁,更常见的是称为二进制信号量,因为它只有两个状态:有一个许可可用,或没有许可可用。当以这种方式使用时,二进制信号量有个属性,“锁”可以被其他线程而不是属主线程(信号量没有记录属主关系)释放。

公平性属性:公平策略,按FIFO顺序分配许可;非公平策略,允许闯入,即 acquire 调用线程获取许可,而不是已经在等待的线程。

一般地,信号量用于控制资源的访问应当初始化为公平的,来保证没有线程因为不能访问资源而饥饿。当把信号量用于其他同步类型的控制,非公平顺序的吞吐量优势压过公平的考量。

内存一致性效果:线程调用 “release” 之前的动作 happens-before 于成功调用 “acquire” 的其他线程的后续动作。

继续阅读