概述
J.U.C 包是java.util.concurrent
包的简写。这个包在JDK5引入,大大增强了Java的并发特性。JDK7还引入ForkJoin框架。
该包提供的能力主要包括:可重入锁,具有原子性操作属性的类,线程池执行服务,调度执行服务,增强的线程安全容器,线程关卡,信号器,ForkJoin任务执行框架等等。
可重入锁
内在锁
在JDK5之前,要获得同步方法,只能通过 synchronized
关键字来达到。
synchronized
使用内在锁或监视器。每个Java对象有一个与之关联的内在锁。每当一个线程尝试访问同步块或方法时,它获取此对象上的内在锁或监视器。对于静态方法的情况,线程获取类对象上的锁。内在锁机制从写代码的角度是个简洁的方法,适用于大多数情况。
内在锁的局限:
- 一个线程在等待获取一个锁时没法中断。(lock interruptibly)
- 尝试获取锁时没法不永久等待。(try lock)
- 没法实现结构化的非阻塞的锁机制,因为内在锁必须在获取它的同一个块里释放。(ReentrantLock可以在一个方法里获取,在另一个方法里释放)
可重入锁
当线程请求一个由其它线程持有的对象锁时,该线程会阻塞;而当线程请求由自己持有的对象锁时,如果该锁是重入锁,请求就会成功,否则阻塞。
ReentrantLock
java.util.concurrent.locks.ReentrantLock
一个可重入的互斥锁。此锁最多支持同一个线程发起的 2147483648 个递归锁。试图超过此限制会导致由锁方法抛出的 Error。
来自JDK文档的使用示例:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
ReentrantReadWriteLock
java.util.concurrent.locks.ReentrantReadWriteLock
支持读锁、写锁分离的可重入锁,与 ReentrantLock
类似。
具有原子性操作特性的类
java.util.concurrent.atomic
包提供了大量具有原子性操作的类,提供了比volatile
关键字更多的功能。
volatile关键字
volatile
关键字用于保证其所修饰的字段具有内存可视性,也就是对这些字段更新后,线程后续的读操作总能看到这些更新;线程读取这些字段了,也能看到其他线程之前所作的更新。
但是volatile
不支持一些基本的操作,比如并发情况下的计数问题。对于一个字段 volatile int count = 0
,执行 count++;
是在并发情况下对count的值是没有安全保证的,有的线程可能读到的是0
,有的可能是1
。
对于下面这样比较后更新的情形,volatile
也没法保证并发安全的。
boolean isLoad = false;
// ...
if (! isLoad) {
isLoad = true;
}
原子类就是用来解决这类需求的。
原子类
该包的类大多提供了具有CAS(Compare And Swap,比较后交换)特性的方法,用于原子性地更新字段的值。
AtomicInteger
的一些方法:
compareAndSet(int expect, int update) 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
addAndGet(int delta) 以原子方式将给定值与当前值相加。
decrementAndGet() 以原子方式将当前值减 1。
getAndAdd(int delta) 以原子方式将给定值与当前值相加。
getAndDecrement() 以原子方式将当前值减 1。
getAndSet(int newValue) 以原子方式设置为给定值,并返回旧值。
set(int newValue) 设置为给定值。
get() 获取当前值。
无锁算法
原子类除了解决上面的这些问题,还可以用来实现一些无锁算法。实现情形大致是这样的:
AtomicInteger counter = new AtomicInteger(0);
// ....
int oldValue, newValue;
do {
oldValue = counter.get();
// synchronized code trunk
newValue = oldVaue+1;
}while( ! counter.compareAndSet(oldValue, newValue) );
这类实现认为,如果一个变量它的值在同步代码块访问前后没有修改,就认为没有线程并发执行了这个代码块。
无锁算法基本上利用了这个思想:把同步块最小化到单个变量上。对于每个同步条件,都需要一个具有CAS方法的类;如果同步条件太多,还是用锁简单点。
ABA问题
对于上面这类无锁算法实现的一个问题是:oldValue = counter.get();
语句执行完后, counter
的值为 A
,这个线程T1被挂起,另一个线程T2对 counter
进行更新,把值改为B
,然后又更新为A
,当T1恢复执行的时候,其实上下文已经被修改,而它却没法知道,因为值还是A
。这就是“ABA问题”(此处应该google)。
此包为解决ABA问题提供了一个类:AtomicStampedReference
,维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
线程池执行服务
JDK5以前的多线程
在JDK5以前,利用Java的并发能力一般有两种方式:继承自Thread类并覆写run
方法; 或者实现Runnable
接口,将实现的实例传递给Thread构造函数,再调用Thread.start
方法来启动一个线程。代码大概如下:
static class MyThread extends Thread {
@Override
public void run() {
// do something in other thread .
}
}
static class Task implements Runnable {
@Override
public void run() {
// task command .
}
}
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
Task task = new Task();
Thread thread = new Thread(task);
thread.start();
}
这里要说明的是:Runnable
实现是一个任务,是指令集合,不能自己运行;Thread
才是一个可执行的单元,是一个“CPU”。是Thread
驱动执行任务Runnable
。
对于Thread
对象,当它的run
方法执行完成后,线程就进入完成状态,等待销毁。
当需要异步执行一个任务时,就新建一个线程,当一个虚拟机新建了几百上千个线程时,这会带来很多问题:
- 上下文切换代价。Java的线程是要映射到内核线程上去执行的,线程太多时基本上就没法进行有效的调度执行,线程之间的上下文切换就会占用很多的CPU周期。
- 内存资源占用。JVM新建一个线程后,要给线程分配相应的方法栈、程序计数器等资源,这些都会占用的一定的内存资源,线程很多时,这些资源就不小了。
这些问题说明,大量创建线程不是个好主意,我们需要重用已创建的线程。这就是线程池的作用。
线程重用的最基本原理
既然Thread的run方法执行完成就表示一个线程已终止,那么就需要在这个run方法上做点处理,让它能持续地处理我们的任务。
import java.util.concurrent.LinkedBlockingQueue;
public class SimpleReuseThread extends Thread {
private volatile boolean isStop = false;
private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(
1024);
public boolean submitTask(Runnable task) {
return taskQueue .add(task);
}
public void run() {
while (!isStop ) {
if (!taskQueue .isEmpty()) {
Runnable task = taskQueue.peek();
task.run();
} else {
try {
Thread. sleep(1000);
} catch (InterruptedException ignore) {
}
}
}
}
public void shutDown() {
isStop = true ;
}
}
上面的代码足以说明线程复用的最基本原理。但好的线程池实现还需要处理:线程池生命周期管理、线程复用、工作任务队列管理、高效地分派任务到执行线程等工作。好消息是已经有满足这些需求的线程池实现了。
J.U.C的线程池
J.U.C的线程池封装为 java.util.concurrent.ExecutorService
接口。该接口提供了方法用于 提交任务、执行任务、查询线程池是否已被关闭、关闭线程池。
java.util.concurrent.Executors
提供了便捷的方法用于创建各种线程池:
newCachedThreadPool() 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
newFixedThreadPool(int nThreads) 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
newScheduledThreadPool(int corePoolSize) 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newScheduledThreadPool(int corePoolSize) 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newSingleThreadExecutor() 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
newSingleThreadScheduledExecutor() 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
Future
当把任务提交到线程池(java.util.concurrent.ExecutorService
)去执行后,线程池会返回一个表示任务直接结果的接口Future
,当前线程可以继续往下执行其他逻辑,在某个时间点,它可以查询任务是否执行完成,如果执行完成,还可以获取任务返回的结果。
来自doc文档的使用示例:
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target) throws InterruptedException {
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
调度执行服务
Timer
在JDK5以前,如果需要定时或周期性地执行某个任务,可以通过 java.util.Timer
类来实现。
Timer
的缺陷:
- 所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务;(延迟累加)
Timer
线程不捕获异常,如果TimerTask
抛出异常,将导致Timer
线程终止。如果Timer
的线程死掉了,所有任务都不会再执行。Timer
对调度的支持是基于绝对时间的,而不是相对时间,因此任务对系统时钟的改变是敏感的;下面的调度服务只支持相对时间。
如果还在用 Timer
进行调度,真该好好考虑调度执行服务了。
调度服务
这里说的调度服务其实是线程池提供的扩展能力,调度服务封装为java.util.concurrent.ScheduledExecutorService
接口。它解决了上面的Timer
缺陷。
schedule(Callable<V> callable, long delay, TimeUnit unit) 创建并执行在给定延迟后启用的 ScheduledFuture。
schedule(Runnable command, long delay, TimeUnit unit) 创建并执行在给定延迟后启用的一次性操作。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
先写这么多,以后再补充。
增强的线程安全容器
线程关卡
信号器
ForkJoin任务执行框架
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。