分类目录归档:并发

基于数据库的乐观锁

涉及涉及数据库的高并发解决方案一般都会提到乐观锁。

乐观锁:在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。如果其他事务有更新的话,正在提交的事务会进行回滚。

利用数据库实现乐观锁的伪代码:

Connection conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
// step 1
int oldVersion = getOldVersion(stmt);

// step 2
// 用这个数据库连接做其他的逻辑

// step 3 可用预编译语句
int i = stmt.executeUpdate(
        "update optimistic_lock set version = " + (oldVersion + 1) + " where version = " + oldVersion);

// step 4
if (i > 0) {
    conn.commit(); // 更新成功表明数据没有被修改,提交事务。
} else {
    conn.rollback(); // 更新失败,数据被修改,回滚。
}

注意事项:

  • 前面伪代码的 step 3 和 4 应在存储过程里完成,防止 step 3 完成后,由于 GC 或网络阻塞导致 step 4 延迟执行,此时 乐观锁的记录被数据库锁定,其他请求要进行更新只能等待,被阻塞。
    可以用存储过程来替代后面的两步,或者整个逻辑都用存储过程实现:
delimiter $$

create procedure sp_update_version (newVersion int, oldVersion int) begin
    update optimistic_lock t set t.version = newVersion where t.version = oldVersion;

    if row_count() > 0 then
        commit;
    else
        rollback;
    end if;
end $$

乐观锁的缺点:

  • 会带来大数量的无效更新请求、事务回滚,给DB造成不必要的额外压力。
  • 无法保证先到先得,后面的请求可能由于并发压力小了反而有可能处理成功。

基于Redis实现分布式锁

用单实例的正确实现

命令:set resource_name my_random_value NX PX timeout_millis,在 key resource_name 不存在时(NX选项的作用)设置这个key 的值为 my_random_value、超时时间设为 timeout_millisPX选项的作用)。

删除key的时候用 Lua 脚本来检测key的值是否为 my_random_value,是才允许删除;需要保证这个值在所有客户端里唯一;(借助 Lua 实现一个 CAS 操作)

一些注意点与问题

  • 一个分布式锁必须设置超时时间,否则一旦某个客户端获取锁成功后与 Redis 无法通信,那么它会一直持有这个锁,其他客户端无法获得锁。
    这个过期时间叫做锁的有效时间(lock validity time),客户端必须在这个时间内完成对资源的访问操作。

  • 设置key与超时时间必须在一个命令里完成。如果客户端在设置了key后、设置超时时间前崩溃,那么它会一直持有这个锁。

  • 释放锁时必须检查key对应的值是否与自己持有的一致(通过 Lua 实现CAS),防止误删其他客户端持有的锁。防止出现:客户端A准备释放持有的锁时,检测到值与自己持有的一致,然后由于某种原因被长时间阻塞,锁的超时时间到达后客户端B获取了锁,此时客户端A执行不检查key值的删除操作就会破坏了锁。

继续阅读

《Java 虚拟机并发编程》笔记

并发

线程数 = CPU可用核心数 / ( 1 – 阻塞系数 )
阻塞系数的取值在 0 - 1 之间,计算密集型任务的阻塞系数是 0,IO 密集型任务的阻塞系数接近于 1。

构建计算密集型并发应用程序的几点经验:
* 子任务的划分数不少于处理器核心数;
* 线程数多于处理器核心数对性能提升毫无帮助;
* 在子任务划分超过一定数量之后,再增加子任务划分数对于性能的提升将十分有限。

保持一个合理的划分数,并使所有处理器核心都有足够的工作量才是关键。

应该尽可能地提供共享不可变性,否则就应该遵循隔离可变性原则,即保证总是只有一个线程访问可变变量。

开多少个线程以及如何拆分问题都会影响到你的并发应用程序的性能,还要权衡每个子任务的工作负载和划分开销。

继续阅读

Java8 StampedLock

概述

StampedLock 是基于能力的锁,用三种模式来控制读/写访问。StampedLock 的状态包含了 版本和模式。锁获取方法根据锁的状态返回一个表示和控制访问的标志(stamp),“try”版本的这些方法可能返回一个特殊的值 0 来表示获取失败。锁释放和它的变换方法要求一个标志作为参数,如果它们不符合锁的状态就失败。这三种模式是:

  • 写:方法 writeLock 可能阻塞等待独占访问,返回一个标志,可用在方法 unlockWrite 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。当锁以写模式持有时,没有读锁可以获取,所有乐观性读确认将失败。

  • 读:方法 readLock 可能为非独占访问而阻塞等待,返回一个标志用于方法 unlockRead 以释放锁。也提供了无时间和带时间版本的 tryWriteLock 方法。

  • 乐观读:只有在锁当前没有以写模式持有时,方法 tryOptimisticRead 返回一个非 0 标志。如果锁自给定标志以来没有以写模式持有,方法 validate 返回 true 。这种模式可以认为是一种极弱版本的读锁,可以在任意时间被写者打破。在短的只读代码段使用乐观模式常常可以减少竞争和提升吞吐量。然而,它的使用天生是脆弱的。乐观读片段section应该只读字段并持有到本地变量,用于以后使用,在确认以后。乐观读模式里的字段读取可能很不一致,所以惯例只用于当你对数据表示足够熟悉,可以检查一致性和/或重复调用 validate() 方法。例如,这些步骤典型地在第一次读取对象或数组引用,然后访问其中字段、元素或方法时要求。

继续阅读

Java8 Striped64 和 LongAdder

数据 striping

根据维基百科的这段说明

In computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that consecutive segments are stored on different physical storage devices.

Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage, network interface controllers, different computers in clustered file systems and grid-oriented storage, and RAM in some systems.

数据 striping 就是把逻辑上连续的数据分为多个段,使这一序列的段存储在不同的物理设备上。通过把段分散到多个设备上可以增加访问并发性,从而提升总体的吞吐量。

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

继续阅读

JUC Exchanger

一、概述

Exchanger 可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

二、算法描述

基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。

这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena, 竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:

继续阅读

JUC Semaphore 信号量

概要

一个计数信号量,维护了一组许可。acquire() 方法在许可可用前将阻塞,许可可用时获取,每个release() 方法添加一个许可,潜在地释放一个阻塞的获取线程。

信号量常用于约束访问一些(物理或逻辑)资源的线程数量。

信号量初始化为 1 可以用作互斥锁,更常见的是称为二进制信号量,因为它只有两个状态:有一个许可可用,或没有许可可用。当以这种方式使用时,二进制信号量有个属性,“锁”可以被其他线程而不是属主线程(信号量没有记录属主关系)释放。

公平性属性:公平策略,按FIFO顺序分配许可;非公平策略,允许闯入,即 acquire 调用线程获取许可,而不是已经在等待的线程。

一般地,信号量用于控制资源的访问应当初始化为公平的,来保证没有线程因为不能访问资源而饥饿。当把信号量用于其他同步类型的控制,非公平顺序的吞吐量优势压过公平的考量。

内存一致性效果:线程调用 “release” 之前的动作 happens-before 于成功调用 “acquire” 的其他线程的后续动作。

继续阅读

JUC CyclicBarrier 可重用屏障

介绍

CyclicBarrier 是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待,在通过临界点之后,这些线程是独立的;CountDownLatch 是让一个线程等待其他线程完成某些任务,其他线程之间一直是独立的。

CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。

在释放所有线程后,CyclicBarrier 可以通过重置状态来重用,这也是 Cyclic 的来源。

继续阅读

Future 与 FutureTask

Future

来自 Java DOC 文档:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

也就是说Future具有这样的特性:

  • 异步执行,可用 get 方法获取执行结果。
  • 如果计算还没完成,get 方法是会阻塞的,如果完成了,是可以多次获取并立即得到结果的。
  • 如果计算还没完成,是可以取消计算的。
  • 可以查询计算的执行状态。

埋两个小问题用于设想下怎么实现Future:

  1. Future在计算完成前阻塞 get 访问,完成后可以自由访问,如何实现 get 方法?
  2. 计算的取消是怎么实现的?被取消的计算会终止执行吗?

继续阅读

CopyOnWrite 策略

CopyOnWrite 是用于解决并发读写的一种策略,在Write的时候对共享变量进行Copy,在副本上进行更新,再把更新好的副本原子性地替换原来的共享变量。

实现:

  • 读操作Read不涉及对共享变量的更改,因此不需要同步。
  • 如果有多个线程同时申请Write,在各自拷贝的副本上进行修改,然后更新回共享变量,就会导致某些线程的修改被其他线程覆盖。因此Write必须在同步的情况下进行。

优缺点:

  • 由于修改是在副本上进行,所以修改的同时允许其他线程进行读。
  • 由于需要进行拷贝,当然会存在内存占用的问题。
  • 由于在进行写的过程中仍然允许读,所以数据不是实时一致的,只有在写完成后才一致,也就是最终一致。如果需要实时的一致性,建议使用读写锁。

CopyOnWrite 策略适用于那些读远多于写、且对实时性要求不高的操作,优势在读不需要同步。

更多了解可参考 JUC 里 CopyOnWriteArrayList 的实现。