J.U.C 包

概述

J.U.C 包是java.util.concurrent包的简写。这个包在JDK5引入,大大增强了Java的并发特性。JDK7还引入ForkJoin框架。

该包提供的能力主要包括:可重入锁,具有原子性操作属性的类,线程池执行服务,调度执行服务,增强的线程安全容器,线程关卡,信号器,ForkJoin任务执行框架等等。

可重入锁

内在锁

在JDK5之前,要获得同步方法,只能通过 synchronized 关键字来达到。

synchronized 使用内在锁或监视器。每个Java对象有一个与之关联的内在锁。每当一个线程尝试访问同步块或方法时,它获取此对象上的内在锁或监视器。对于静态方法的情况,线程获取类对象上的锁。内在锁机制从写代码的角度是个简洁的方法,适用于大多数情况。

内在锁的局限:

  1. 一个线程在等待获取一个锁时没法中断。(lock interruptibly)
  2. 尝试获取锁时没法不永久等待。(try lock)
  3. 没法实现结构化的非阻塞的锁机制,因为内在锁必须在获取它的同一个块里释放。(ReentrantLock可以在一个方法里获取,在另一个方法里释放)

可重入锁

当线程请求一个由其它线程持有的对象锁时,该线程会阻塞;而当线程请求由自己持有的对象锁时,如果该锁是重入锁,请求就会成功,否则阻塞。

ReentrantLock

java.util.concurrent.locks.ReentrantLock 一个可重入的互斥锁。此锁最多支持同一个线程发起的 2147483648 个递归锁。试图超过此限制会导致由锁方法抛出的 Error。

来自JDK文档的使用示例:

class X {
     private final ReentrantLock lock = new ReentrantLock();
     // ...

     public void m() {
          lock.lock();  // block until condition holds
          try {
               // ... method body
          } finally {
               lock.unlock()
          }
     }
}

ReentrantReadWriteLock

java.util.concurrent.locks.ReentrantReadWriteLock 支持读锁、写锁分离的可重入锁,与 ReentrantLock 类似。

具有原子性操作特性的类

java.util.concurrent.atomic 包提供了大量具有原子性操作的类,提供了比volatile关键字更多的功能。

volatile关键字

volatile关键字用于保证其所修饰的字段具有内存可视性,也就是对这些字段更新后,线程后续的读操作总能看到这些更新;线程读取这些字段了,也能看到其他线程之前所作的更新。

但是volatile不支持一些基本的操作,比如并发情况下的计数问题。对于一个字段 volatile int count = 0,执行 count++; 是在并发情况下对count的值是没有安全保证的,有的线程可能读到的是0,有的可能是1

对于下面这样比较后更新的情形,volatile也没法保证并发安全的。

boolean isLoad = false;
//  ...
if (! isLoad) {
     isLoad  = true;
}

原子类就是用来解决这类需求的。

原子类

该包的类大多提供了具有CAS(Compare And Swap,比较后交换)特性的方法,用于原子性地更新字段的值。

AtomicInteger 的一些方法:

compareAndSet(int expect, int update)    如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
addAndGet(int delta)    以原子方式将给定值与当前值相加。
decrementAndGet()      以原子方式将当前值减 1。
getAndAdd(int delta)     以原子方式将给定值与当前值相加。
getAndDecrement()      以原子方式将当前值减 1。
getAndSet(int newValue)    以原子方式设置为给定值,并返回旧值。
set(int newValue)              设置为给定值。
get()                                获取当前值。

无锁算法

原子类除了解决上面的这些问题,还可以用来实现一些无锁算法。实现情形大致是这样的:

AtomicInteger  counter = new AtomicInteger(0);
//   ....

int  oldValue,  newValue;

do {
     oldValue = counter.get();

     //  synchronized  code  trunk 

     newValue = oldVaue+1;
}while( !  counter.compareAndSet(oldValue,  newValue) );

这类实现认为,如果一个变量它的值在同步代码块访问前后没有修改,就认为没有线程并发执行了这个代码块。

无锁算法基本上利用了这个思想:把同步块最小化到单个变量上。对于每个同步条件,都需要一个具有CAS方法的类;如果同步条件太多,还是用锁简单点。

ABA问题

对于上面这类无锁算法实现的一个问题是:oldValue = counter.get(); 语句执行完后, counter的值为 A,这个线程T1被挂起,另一个线程T2对 counter进行更新,把值改为B,然后又更新为A,当T1恢复执行的时候,其实上下文已经被修改,而它却没法知道,因为值还是A。这就是“ABA问题”(此处应该google)。

此包为解决ABA问题提供了一个类:AtomicStampedReference,维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。

线程池执行服务

JDK5以前的多线程

在JDK5以前,利用Java的并发能力一般有两种方式:继承自Thread类并覆写run方法; 或者实现Runnable 接口,将实现的实例传递给Thread构造函数,再调用Thread.start方法来启动一个线程。代码大概如下:

static class MyThread extends Thread {
       @Override
       public void run() {
             // do something in other thread .
      }
}

static class Task implements Runnable {
       @Override
       public void run() {
             // task command .
      }
}

public static void main(String[] args) {
      MyThread myThread = new MyThread();
      myThread.start();

      Task task = new Task();
      Thread thread = new Thread(task);
      thread.start();
}

这里要说明的是:Runnable 实现是一个任务,是指令集合,不能自己运行;Thread 才是一个可执行的单元,是一个“CPU”。是Thread驱动执行任务Runnable

对于Thread对象,当它的run方法执行完成后,线程就进入完成状态,等待销毁。

当需要异步执行一个任务时,就新建一个线程,当一个虚拟机新建了几百上千个线程时,这会带来很多问题:

  1. 上下文切换代价。Java的线程是要映射到内核线程上去执行的,线程太多时基本上就没法进行有效的调度执行,线程之间的上下文切换就会占用很多的CPU周期。
  2. 内存资源占用。JVM新建一个线程后,要给线程分配相应的方法栈、程序计数器等资源,这些都会占用的一定的内存资源,线程很多时,这些资源就不小了。

这些问题说明,大量创建线程不是个好主意,我们需要重用已创建的线程。这就是线程池的作用。

线程重用的最基本原理

既然Thread的run方法执行完成就表示一个线程已终止,那么就需要在这个run方法上做点处理,让它能持续地处理我们的任务。

import java.util.concurrent.LinkedBlockingQueue;

public class SimpleReuseThread extends Thread {
       private volatile boolean isStop = false;
       private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(
                  1024);

       public boolean submitTask(Runnable task) {
             return taskQueue .add(task);
      }

       public void run() {
             while (!isStop ) {
                   if (!taskQueue .isEmpty()) {
                        Runnable task = taskQueue.peek();
                        task.run();
                  } else {
                         try {
                              Thread. sleep(1000);
                        } catch (InterruptedException ignore) {
                        }
                  }
            }
      }

       public void shutDown() {
             isStop = true ;
      }
}

上面的代码足以说明线程复用的最基本原理。但好的线程池实现还需要处理:线程池生命周期管理、线程复用、工作任务队列管理、高效地分派任务到执行线程等工作。好消息是已经有满足这些需求的线程池实现了。

J.U.C的线程池

J.U.C的线程池封装为 java.util.concurrent.ExecutorService 接口。该接口提供了方法用于 提交任务、执行任务、查询线程池是否已被关闭、关闭线程池。

java.util.concurrent.Executors 提供了便捷的方法用于创建各种线程池:

newCachedThreadPool()                       创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
newFixedThreadPool(int nThreads)            创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
newScheduledThreadPool(int corePoolSize)    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newScheduledThreadPool(int corePoolSize)    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
newSingleThreadExecutor()                   创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
newSingleThreadScheduledExecutor()           创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

Future

当把任务提交到线程池(java.util.concurrent.ExecutorService)去执行后,线程池会返回一个表示任务直接结果的接口Future,当前线程可以继续往下执行其他逻辑,在某个时间点,它可以查询任务是否执行完成,如果执行完成,还可以获取任务返回的结果。

来自doc文档的使用示例:

interface ArchiveSearcher { String search(String target); }
class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)  throws InterruptedException {

     Future<String> future = executor.submit(new Callable<String>() {
            public String call() {
                 return searcher.search(target);
            }});
     displayOtherThings(); // do other things while searching
     try {
          displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
}

调度执行服务

Timer

在JDK5以前,如果需要定时或周期性地执行某个任务,可以通过 java.util.Timer 类来实现。

Timer 的缺陷:

  1. 所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务;(延迟累加)
  2. Timer线程不捕获异常,如果TimerTask抛出异常,将导致Timer线程终止。如果Timer 的线程死掉了,所有任务都不会再执行。
  3. Timer对调度的支持是基于绝对时间的,而不是相对时间,因此任务对系统时钟的改变是敏感的;下面的调度服务只支持相对时间。

如果还在用 Timer 进行调度,真该好好考虑调度执行服务了。

调度服务

这里说的调度服务其实是线程池提供的扩展能力,调度服务封装为java.util.concurrent.ScheduledExecutorService 接口。它解决了上面的Timer缺陷。

schedule(Callable<V> callable, long delay, TimeUnit unit)           创建并执行在给定延迟后启用的 ScheduledFuture。
schedule(Runnable command, long delay, TimeUnit unit)               创建并执行在给定延迟后启用的一次性操作。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)             创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)           创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

先写这么多,以后再补充。

增强的线程安全容器

线程关卡

信号器

ForkJoin任务执行框架


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据