并发编程(下)
看待事情的本质往往是一条捷径。

内存
JMM
内存模型
Java 内存模型是 Java Memory Model(JMM),本身是一种抽象的概念,实际上并不存在,描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式
JMM 作用:
屏蔽各种硬件和操作系统的内存访问差异,实现让 Java 程序在各种平台下都能达到一致的内存访问效果
规定了线程和内存之间的一些关系
根据 JMM 的设计,系统存在一个主内存(Main Memory),Java 中所有变量都存储在主存中,对于所有线程都是共享的;每条线程都有自己的工作内存(Working Memory),工作内存中保存的是主存中某些变量的拷贝,线程对所有变量的操作都是先对变量进行拷贝,然后在工作内存中进行,不能直接操作主内存中的变量;线程之间无法相互直接访问,线程间的通信(传递)必须通过主内存来完成

主内存和工作内存:
主内存:计算机的内存,也就是经常提到的 8G 内存,16G 内存,存储所有共享变量的值
工作内存:存储该线程使用到的共享变量在主内存的的值的副本拷贝
JVM 和 JMM 之间的关系:JMM 中的主内存、工作内存与 JVM 中的 Java 堆、栈、方法区等并不是同一个层次的内存划分,这两者基本上是没有关系的,如果两者一定要勉强对应起来:
主内存主要对应于 Java 堆中的对象实例数据部分,而工作内存则对应于虚拟机栈中的部分区域
从更低层次上说,主内存直接对应于物理硬件的内存,工作内存对应寄存器和高速缓存
内存交互
Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作,每个操作都是原子的
非原子协定:没有被 volatile 修饰的 long、double 外,默认按照两次 32 位的操作

lock:作用于主内存,将一个变量标识为被一个线程独占状态(对应 monitorenter)
unclock:作用于主内存,将一个变量从独占状态释放出来,释放后的变量才可以被其他线程锁定(对应 monitorexit)
read:作用于主内存,把一个变量的值从主内存传输到工作内存中
load:作用于工作内存,在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
use:作用于工作内存,把工作内存中一个变量的值传递给执行引擎,每当遇到一个使用到变量的操作时都要使用该指令
assign:作用于工作内存,把从执行引擎接收到的一个值赋给工作内存的变量
store:作用于工作内存,把工作内存的一个变量的值传送到主内存中
write:作用于主内存,在 store 之后执行,把 store 得到的值放入主内存的变量中
三大特性
可见性
可见性:是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值
存在不可见问题的根本原因是由于缓存的存在,线程持有的是共享变量的副本,无法感知其他线程对于共享变量的更改,导致读取的值不是最新的。但是 final 修饰的变量是不可变的,就算有缓存,也不会存在不可见的问题
main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:
static boolean run = true; //添加volatile
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()->{
while(run){
// ....
}
});
t.start();
sleep(1);
run = false; // 线程t不会如预想的停下来
}原因:
初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存
因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率
1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值

原子性
原子性:不可分割,完整性,也就是说某个线程正在做某个具体业务时,中间不可以被分割,需要具体完成,要么同时成功,要么同时失败,保证指令不会受到线程上下文切换的影响
定义原子操作的使用规则:
不允许 read 和 load、store 和 write 操作之一单独出现,必须顺序执行,但是不要求连续
不允许一个线程丢弃 assign 操作,必须同步回主存
不允许一个线程无原因地(没有发生过任何 assign 操作)把数据从工作内存同步会主内存中
一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(assign 或者 load)的变量,即对一个变量实施 use 和 store 操作之前,必须先自行 assign 和 load 操作
一个变量在同一时刻只允许一条线程对其进行 lock 操作,但 lock 操作可以被同一线程重复执行多次,多次执行 lock 后,只有执行相同次数的 unlock 操作,变量才会被解锁,lock 和 unlock 必须成对出现
如果对一个变量执行 lock 操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量之前需要重新从主存加载
如果一个变量事先没有被 lock 操作锁定,则不允许执行 unlock 操作,也不允许去 unlock 一个被其他线程锁定的变量
对一个变量执行 unlock 操作之前,必须先把此变量同步到主内存中(执行 store 和 write 操作)
有序性
有序性:在本线程内观察,所有操作都是有序的;在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序
CPU 的基本工作是执行存储的指令序列,即程序,程序的执行过程实际上是不断地取出指令、分析指令、执行指令的过程,为了提高性能,编译器和处理器会对指令重排,一般分为以下三种:
源代码 -> 编译器优化的重排 -> 指令并行的重排 -> 内存系统的重排 -> 最终执行指令
现代 CPU 支持多级指令流水线,几乎所有的冯•诺伊曼型计算机的 CPU,其工作都可以分为 5 个阶段:取指令、指令译码、执行指令、访存取数和结果写回,可以称之为五级指令流水线。CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段(每个线程不同的阶段),本质上流水线技术并不能缩短单条指令的执行时间,但变相地提高了指令地吞吐率
处理器在进行重排序时,必须要考虑指令之间的数据依赖性
单线程环境也存在指令重排,由于存在依赖性,最终执行结果和代码顺序的结果一致
多线程环境中线程交替执行,由于编译器优化重排,会获取其他线程处在不同阶段的指令同时执行
补充知识:
指令周期是取出一条指令并执行这条指令的时间,一般由若干个机器周期组成
机器周期也称为 CPU 周期,一条指令的执行过程划分为若干个阶段(如取指、译码、执行等),每一阶段完成一个基本操作,完成一个基本操作所需要的时间称为机器周期
振荡周期指周期性信号作周期性重复变化的时间间隔
cache
缓存机制
缓存结构
在计算机系统中,CPU 高速缓存(CPU Cache,简称缓存)是用于减少处理器访问内存所需平均时间的部件;在存储体系中位于自顶向下的第二层,仅次于 CPU 寄存器;其容量远小于内存,但速度却可以接近处理器的频率
CPU 处理器速度远远大于在主内存中的,为了解决速度差异,在它们之间架设了多级缓存,如 L1、L2、L3 级别的缓存,这些缓存离 CPU 越近就越快,将频繁操作的数据缓存到这里,加快访问速度

| 从 CPU 到 | 大约需要的时钟周期 |
|---|---|
| 寄存器 | 1 cycle (4GHz 的 CPU 约为 0.25ns) |
| L1 | 3~4 cycle |
| L2 | 10~20 cycle |
| L3 | 40~45 cycle |
| 内存 | 120~240 cycle |
缓存使用
当处理器发出内存访问请求时,会先查看缓存内是否有请求数据,如果存在(命中),则不用访问内存直接返回该数据;如果不存在(失效),则要先把内存中的相应数据载入缓存,再将其返回处理器
缓存之所以有效,主要因为程序运行时对内存的访问呈现局部性(Locality)特征。既包括空间局部性(Spatial Locality),也包括时间局部性(Temporal Locality),有效利用这种局部性,缓存可以达到极高的命中率
伪共享
缓存以缓存行 cache line 为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),在 CPU 从主存获取数据时,以 cache line 为单位加载,于是相邻的数据会一并加载到缓存中
缓存会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,需要做到某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效,这就是伪共享

解决方法:
padding:通过填充,让数据落在不同的 cache line 中
@Contended:原理参考 无锁 → Adder → 优化机制 → 伪共享
Linux 查看 CPU 缓存行:
命令:
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size64内存地址格式: [ 高位组标记 ] [ 低位索引 ] [ 偏移量 ]
缓存一致
缓存一致性:当多个处理器运算任务都涉及到同一块主内存区域的时候,将可能导致各自的缓存数据不一样

MESI(Modified Exclusive Shared Or Invalid)是一种广泛使用的支持写回策略的缓存一致性协议,CPU 中每个缓存行(caceh line)使用 4 种状态进行标记(使用额外的两位 bit 表示):
M:被修改(Modified)
该缓存行只被缓存在该 CPU 的缓存中,并且是被修改过的,与主存中的数据不一致 (dirty),该缓存行中的内存需要写回 (write back) 主存。该状态的数据再次被修改不会发送广播,因为其他核心的数据已经在第一次修改时失效一次
当被写回主存之后,该缓存行的状态会变成独享 (exclusive) 状态
E:独享的(Exclusive)
该缓存行只被缓存在该 CPU 的缓存中,是未被修改过的 (clear),与主存中数据一致,修改数据不需要通知其他 CPU 核心,该状态可以在任何时刻有其它 CPU 读取该内存时变成共享状态 (shared)
当 CPU 修改该缓存行中内容时,该状态可以变成 Modified 状态
S:共享的(Shared)
该状态意味着该缓存行可能被多个 CPU 缓存,并且各个缓存中的数据与主存数据一致,当 CPU 修改该缓存行中,会向其它 CPU 核心广播一个请求,使该缓存行变成无效状态 (Invalid),然后再更新当前 Cache 里的数据
I:无效的(Invalid)
该缓存是无效的,可能有其它 CPU 修改了该缓存行
解决方法:各个处理器访问缓存时都遵循一些协议,在读写时要根据协议进行操作,协议主要有 MSI、MESI 等
处理机制
单核 CPU 处理器会自动保证基本内存操作的原子性
多核 CPU 处理器,每个 CPU 处理器内维护了一块内存,每个内核内部维护着一块缓存,当多线程并发读写时,就会出现缓存数据不一致的情况。处理器提供:
总线锁定:当处理器要操作共享变量时,在 BUS 总线上发出一个 LOCK 信号,其他处理器就无法操作这个共享变量,该操作会导致大量阻塞,从而增加系统的性能开销(平台级别的加锁)
缓存锁定:当处理器对缓存中的共享变量进行了操作,其他处理器有嗅探机制,将各自缓存中的该共享变量的失效,读取时会重新从主内存中读取最新的数据,基于 MESI 缓存一致性协议来实现
有如下两种情况处理器不会使用缓存锁定:
当操作的数据跨多个缓存行,或没被缓存在处理器内部,则处理器会使用总线锁定
有些处理器不支持缓存锁定,比如:Intel 486 和 Pentium 处理器也会调用总线锁定
总线机制:
总线嗅探:每个处理器通过嗅探在总线上传播的数据来检查自己缓存值是否过期了,当处理器发现自己的缓存对应的内存地址的数据被修改,就将当前处理器的缓存行设置为无效状态,当处理器对这个数据进行操作时,会重新从内存中把数据读取到处理器缓存中
总线风暴:当某个 CPU 核心更新了 Cache 中的数据,要把该事件广播通知到其他核心(写传播),CPU 需要每时每刻监听总线上的一切活动,但是不管别的核心的 Cache 是否缓存相同的数据,都需要发出一个广播事件,不断的从主内存嗅探和 CAS 循环,无效的交互会导致总线带宽达到峰值;因此不要大量使用 volatile 关键字,使用 volatile、syschonized 都需要根据实际场景
volatile
同步机制
volatile 是 Java 虚拟机提供的轻量级的同步机制(三大特性)
保证可见性
不保证原子性
保证有序性(禁止指令重排)
性能:volatile 修饰的变量进行读操作与普通变量几乎没什么差别,但是写操作相对慢一些,因为需要在本地代码中插入很多内存屏障来保证指令不会发生乱序执行,但是开销比锁要小
synchronized 无法禁止指令重排和处理器优化,为什么可以保证有序性可见性
加了锁之后,只能有一个线程获得到了锁,获得不到锁的线程就要阻塞,所以同一时间只有一个线程执行,相当于单线程,由于数据依赖性的存在,单线程的指令重排是没有问题的
线程加锁前,将清空工作内存中共享变量的值,使用共享变量时需要从主内存中重新读取最新的值;线程解锁前,必须把共享变量的最新值刷新到主内存中(JMM 内存交互章节有讲)
指令重排
volatile 修饰的变量,可以禁用指令重排
指令重排实例:
- example 1:
public void mySort() {
int x = 11; //语句1
int y = 12; //语句2 谁先执行效果一样
x = x + 5; //语句3
y = x * x; //语句4
}
执行顺序是:1 2 3 4、2 1 3 4、1 3 2 4
指令重排也有限制不会出现:4321,语句 4 需要依赖于 y 以及 x 的申明,因为存在数据依赖,无法首先执行- example 2:
int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
// 线程2 执行此方法
public void actor2(I_Result r) {
num = 2;
ready = true;
}
情况一:线程 1 先执行,ready = false,结果为 r.r1 = 1
情况二:线程 2 先执行 num = 2,但还没执行 ready = true,线程 1 执行,结果为 r.r1 = 1
情况三:线程 2 先执行 ready = true,线程 1 执行,进入 if 分支结果为 r.r1 = 4
情况四:线程 2 执行 ready = true,切换到线程 1,进入 if 分支为 r.r1 = 0,再切回线程 2 执行 num = 2,发生指令重排底层原理
缓存一致
使用 volatile 修饰的共享变量,底层通过汇编 lock 前缀指令进行缓存锁定,在线程修改完共享变量后写回主存,其他的 CPU 核心上运行的线程通过 CPU 总线嗅探机制会修改其共享变量为失效状态,读取时会重新从主内存中读取最新的数据
lock 前缀指令就相当于内存屏障,Memory Barrier(Memory Fence)
对 volatile 变量的写指令后会加入写屏障
对 volatile 变量的读指令前会加入读屏障
内存屏障有三个作用:
确保对内存的读-改-写操作原子执行
阻止屏障两侧的指令重排序
强制把缓存中的脏数据写回主内存,让缓存行中相应的数据失效
内存屏障
保证可见性:
- 写屏障(sfence,Store Barrier)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
public void actor2(I_Result r) {
num = 2;
ready = true; // ready 是 volatile 赋值带写屏障
// 写屏障
}- 读屏障(lfence,Load Barrier)保证在该屏障之后的,对共享变量的读取,从主存刷新变量值,加载的是主存中最新数据
public void actor1(I_Result r) {
// 读屏障
// ready 是 volatile 读取值带读屏障
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
- 全能屏障:mfence(modify/mix Barrier),兼具 sfence 和 lfence 的功能
保证有序性:
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
不能解决指令交错:
写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其他线程的读跑到写屏障之前
有序性的保证也只是保证了本线程内相关代码不被重排序
volatile i = 0;
new Thread(() -> {i++});
new Thread(() -> {i--});i++ 反编译后的指令:
0: iconst_1 // 当int取值 -1~5 时,JVM采用iconst指令将常量压入栈中
1: istore_1 // 将操作数栈顶数据弹出,存入局部变量表的 slot 1
2: iinc 1, 1

交互规则
对于 volatile 修饰的变量:
线程对变量的 use 与 load、read 操作是相关联的,所以变量使用前必须先从主存加载
线程对变量的 assign 与 store、write 操作是相关联的,所以变量使用后必须同步至主存
线程 1 和线程 2 谁先对变量执行 read 操作,就会先进行 write 操作,防止指令重排
双端检锁
检锁机制
Double-Checked Locking:双端检锁机制
DCL(双端检锁)机制不一定是线程安全的,原因是有指令重排的存在,加入 volatile 可以禁止指令重排
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if(INSTANCE == null) { // t2,这里的判断不是线程安全的
// 首次访问会同步,而之后的使用没有 synchronized
synchronized(Singleton.class) {
// 这里是线程安全的判断,防止其他线程在当前线程等待锁的期间完成了初始化
if (INSTANCE == null) {
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}不锁 INSTANCE 的原因:
INSTANCE 要重新赋值
INSTANCE 是 null,线程加锁之前需要获取对象的引用,设置对象头,null 没有引用
实现特点:
懒惰初始化
首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
第一个 if 使用了 INSTANCE 变量,是在同步块之外,但在多线程环境下会产生问题
DCL问题
getInstance 方法对应的字节码为:
0: getstatic #2 // Field INSTANCE:Ltest/Singleton;
3: ifnonnull 37
6: ldc #3 // class test/Singleton
8: dup
9: astore_0
10: monitorenter
11: getstatic #2 // Field INSTANCE:Ltest/Singleton;
14: ifnonnull 27
17: new #3 // class test/Singleton
20: dup
21: invokespecial #4 // Method "<init>":()V
24: putstatic #2 // Field INSTANCE:Ltest/Singleton;
27: aload_0
28: monitorexit
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2 // Field INSTANCE:Ltest/Singleton;
40: areturn17 表示创建对象,将对象引用入栈
20 表示复制一份对象引用,引用地址
21 表示利用一个对象引用,调用构造方法初始化对象
24 表示利用一个对象引用,赋值给 static INSTANCE
步骤 21 和 24 之间不存在数据依赖关系,而且无论重排前后,程序的执行结果在单线程中并没有改变,因此这种重排优化是允许的
关键在于 0:getstatic 这行代码在 monitor 控制之外,可以越过 monitor 读取 INSTANCE 变量的值
当其他线程访问 INSTANCE 不为 null 时,由于 INSTANCE 实例未必已初始化,那么 t2 拿到的是将是一个未初始化完毕的单例返回,这就造成了线程安全的问题

解决方法
指令重排只会保证串行语义的执行一致性(单线程),但并不会关系多线程间的语义一致性
引入 volatile,来保证出现指令重排的问题,从而保证单例模式的线程安全性:
private static volatile SingletonDemo INSTANCE = null;
happens-before
happens-before 先行发生
Java 内存模型具备一些先天的“有序性”,即不需要通过任何同步手段(volatile、synchronized 等)就能够得到保证的安全,这个通常也称为 happens-before 原则,它是可见性与有序性的一套规则总结
不符合 happens-before 规则,JMM 并不能保证一个线程的可见性和有序性
程序次序规则 (Program Order Rule):一个线程内,逻辑上书写在前面的操作先行发生于书写在后面的操作 ,因为多个操作之间有先后依赖关系,则不允许对这些操作进行重排序
锁定规则 (Monitor Lock Rule):一个 unlock 操作先行发生于后面(时间的先后)对同一个锁的 lock 操作,所以线程解锁 m 之前对变量的写(解锁前会刷新到主内存中),对于接下来对 m 加锁的其它线程对该变量的读可见
volatile 变量规则 (Volatile Variable Rule):对 volatile 变量的写操作先行发生于后面对这个变量的读
传递规则 (Transitivity):具有传递性,如果操作 A 先行发生于操作 B,而操作 B 又先行发生于操作 C,则可以得出操作 A 先行发生于操作 C
线程启动规则 (Thread Start Rule):Thread 对象的 start()方 法先行发生于此线程中的每一个操作
static int x = 10;//线程 start 前对变量的写,对该线程开始后对该变量的读可见
new Thread(()->{ System.out.println(x); },"t1").start();线程中断规则 (Thread Interruption Rule):对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生
线程终止规则 (Thread Termination Rule):线程中所有的操作都先行发生于线程的终止检测,可以通过 Thread.join() 方法结束、Thread.isAlive() 的返回值手段检测到线程已经终止执行
对象终结规则(Finaizer Rule):一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始
设计模式
终止模式
终止模式之两阶段终止模式:停止标记用 volatile 是为了保证该变量在多个线程之间的可见性
class TwoPhaseTermination {
// 监控线程
private Thread monitor;
// 停止标记
private volatile boolean stop = false;;
// 启动监控线程
public void start() {
monitor = new Thread(() -> {
while (true) {
Thread thread = Thread.currentThread();
if (stop) {
System.out.println("后置处理");
break;
}
try {
Thread.sleep(1000);// 睡眠
System.out.println(thread.getName() + "执行监控记录");
} catch (InterruptedException e) {
System.out.println("被打断,退出睡眠");
}
}
});
monitor.start();
}
// 停止监控线程
public void stop() {
stop = true;
monitor.interrupt();// 让线程尽快退出Timed Waiting
}
}
// 测试
public static void main(String[] args) throws InterruptedException {
TwoPhaseTermination tpt = new TwoPhaseTermination();
tpt.start();
Thread.sleep(3500);
System.out.println("停止监控");
tpt.stop();
}Balking
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting = false;
public void start() {
System.out.println("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}
// 真正启动监控线程...
}
}对比保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待
例子:希望 doInit() 方法仅被调用一次,下面的实现出现的问题:
当 t1 线程进入 init() 准备 doInit(),t2 线程进来,initialized 还为f alse,则 t2 就又初始化一次
volatile 适合一个线程写,其他线程读的情况,这个代码需要加锁
public class TestVolatile {
volatile boolean initialized = false;
void init() {
if (initialized) {
return;
}
doInit();
initialized = true;
}
private void doInit() {
}
}无锁
CAS
原理
无锁编程:Lock Free
CAS 的全称是 Compare-And-Swap,是 CPU 并发原语
CAS 并发原语体现在 Java 语言中就是 sun.misc.Unsafe 类的各个方法,调用 UnSafe 类中的 CAS 方法,JVM 会实现出 CAS 汇编指令,这是一种完全依赖于硬件的功能,实现了原子操作
CAS 是一种系统原语,原语属于操作系统范畴,是由若干条指令组成 ,用于完成某个功能的一个过程,并且原语的执行必须是连续的,执行过程中不允许被中断,所以 CAS 是一条 CPU 的原子指令,不会造成数据不一致的问题,是线程安全的
底层原理:CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核和多核 CPU 下都能够保证比较交换的原子性
程序是在单核处理器上运行,会省略 lock 前缀,单处理器自身会维护处理器内的顺序一致性,不需要 lock 前缀的内存屏障效果
程序是在多核处理器上运行,会为 cmpxchg 指令加上 lock 前缀。当某个核执行到带 lock 的指令时,CPU 会执行总线锁定或缓存锁定,将修改的变量写入到主存,这个过程不会被线程的调度机制所打断,保证了多个线程对内存操作的原子性
作用:比较当前工作内存中的值和主物理内存中的值,如果相同则执行规定操作,否则继续比较直到主内存和工作内存的值一致为止
CAS 特点:
CAS 体现的是无锁并发、无阻塞并发,线程不会陷入阻塞,线程不需要频繁切换状态(上下文切换,系统调用)
CAS 是基于乐观锁的思想
CAS 缺点:
执行的是循环操作,如果比较不成功一直在循环,最差的情况某个线程一直取到的值和预期值都不一样,就会无限循环导致饥饿,使用 CAS 线程数不要超过 CPU 的核心数,采用分段 CAS 和自动迁移机制
只能保证一个共享变量的原子操作
对于一个共享变量执行操作时,可以通过循环 CAS 的方式来保证原子操作
对于多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候只能用锁来保证原子性
引出来 ABA 问题
乐观锁
CAS 与 synchronized 总结:
synchronized 是从悲观的角度出发:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程),因此 synchronized 也称之为悲观锁,ReentrantLock 也是一种悲观锁,性能较差
CAS 是从乐观的角度出发:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。如果别人修改过,则获取现在最新的值,如果别人没修改过,直接修改共享数据的值,CAS 这种机制也称之为乐观锁,综合性能较好
Atomic
常用API
常见原子类:AtomicInteger、AtomicBoolean、AtomicLong
构造方法:
public AtomicInteger():初始化一个默认值为 0 的原子型 Integerpublic AtomicInteger(int initialValue):初始化一个指定值的原子型 Integer
常用API:
| 方法 | 作用 |
|---|---|
| public final int get() | 获取 AtomicInteger 的值 |
| public final int getAndIncrement() | 以原子方式将当前值加 1,返回的是自增前的值 |
| public final int incrementAndGet() | 以原子方式将当前值加 1,返回的是自增后的值 |
| public final int getAndSet(int value) | 以原子方式设置为 newValue 的值,返回旧值 |
| public final int addAndGet(int data) | 以原子方式将输入的数值与实例中的值相加并返回 实例:AtomicInteger 里的 value |
原理分析
AtomicInteger 原理:自旋锁 + CAS 算法
CAS 算法:有 3 个操作数(内存值 V, 旧的预期值 A,要修改的值 B)
当旧的预期值 A == 内存值 V 此时可以修改,将 V 改为 B
当旧的预期值 A != 内存值 V 此时不能修改,并重新获取现在的最新值,重新获取的动作就是自旋
分析 getAndSet 方法:
- AtomicInteger:
public final int getAndSet(int newValue) {
/**
* this: 当前对象
* valueOffset: 内存偏移量,内存地址
*/
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
valueOffset:偏移量表示该变量值相对于当前对象地址的偏移,Unsafe 就是根据内存偏移地址获取数据
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
//调用本地方法 -->
public native long objectFieldOffset(Field var1);- unsafe 类:
// val1: AtomicInteger对象本身,var2: 该对象值得引用地址,var4: 需要变动的数
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
// var5: 用 var1 和 var2 找到的内存中的真实值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
var5:从主内存中拷贝到工作内存中的值(每次都要从主内存拿到最新的值到本地内存),然后执行 `compareAndSwapInt()` 再和主内存的值进行比较,假设方法返回 false,那么就一直执行 while 方法,直到期望的值和真实值一样,修改数据变量 value 用 volatile 修饰,保证了多线程之间的内存可见性,避免线程从工作缓存中获取失效的变量
private volatile int value
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现比较并交换的效果
分析 getAndUpdate 方法:
- getAndUpdate:
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get(); //当前值,cas的期望值
next = updateFunction.applyAsInt(prev);//期望值更新到该值
} while (!compareAndSet(prev, next));//自旋
return prev;
}
函数式接口:可以自定义操作逻辑
AtomicInteger a = new AtomicInteger();
a.getAndUpdate(i -> i + 10);- compareAndSet:
public final boolean compareAndSet(int expect, int update) {
/**
* this: 当前对象
* valueOffset: 内存偏移量,内存地址
* expect: 期望的值
* update: 更新的值
*/
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}原子引用
原子引用:对 Object 进行原子操作,提供一种读和写都是原子性的对象引用变量
原子引用类:AtomicReference、AtomicStampedReference、AtomicMarkableReference
AtomicReference 类:
构造方法:
AtomicReference<T> atomicReference = new AtomicReference<T>()常用 API:
public final boolean compareAndSet(V expectedValue, V newValue):CAS 操作public final void set(V newValue):将值设置为 newValuepublic final V get():返回当前值
public class AtomicReferenceDemo {
public static void main(String[] args) {
Student s1 = new Student(33, "z3");
// 创建原子引用包装类
AtomicReference<Student> atomicReference = new AtomicReference<>();
// 设置主内存共享变量为s1
atomicReference.set(s1);
// 比较并交换,如果现在主物理内存的值为 z3,那么交换成 l4
while (true) {
Student s2 = new Student(44, "l4");
if (atomicReference.compareAndSet(s1, s2)) {
break;
}
}
System.out.println(atomicReference.get());
}
}
class Student {
private int id;
private String name;
//。。。。
}原子数组
原子数组类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
AtomicIntegerArray 类方法:
/**
* i the index
* expect the expected value
* update the new value
*/
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}原子更新器
原子更新器类:AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater、AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常 IllegalArgumentException: Must be volatile type
常用 API:
static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> c, String fieldName):构造方法abstract boolean compareAndSet(T obj, int expect, int update):CAS
public class UpdateDemo {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater
.newUpdater(UpdateDemo.class, "field");
UpdateDemo updateDemo = new UpdateDemo();
fieldUpdater.compareAndSet(updateDemo, 0, 10);
System.out.println(updateDemo.field);//10
}
}原子累加器
原子累加器类:LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator
LongAdder 和 LongAccumulator 区别:
相同点:
LongAddr 与 LongAccumulator 类都是使用非阻塞算法 CAS 实现的
LongAddr 类是 LongAccumulator 类的一个特例,只是 LongAccumulator 提供了更强大的功能,可以自定义累加规则,当accumulatorFunction 为 null 时就等价于 LongAddr
不同点:
调用 casBase 时,LongAccumulator 使用 function.applyAsLong(b = base, x) 来计算,LongAddr 使用 casBase(b = base, b + x)
LongAccumulator 类功能更加强大,构造方法参数中
accumulatorFunction 是一个双目运算器接口,可以指定累加规则,比如累加或者相乘,其根据输入的两个参数返回一个计算值,LongAdder 内置累加规则
identity 则是 LongAccumulator 累加器的初始值,LongAccumulator 可以为累加器提供非0的初始值,而 LongAdder 只能提供默认的 0
Adder
优化机制
LongAdder 是 Java8 提供的类,跟 AtomicLong 有相同的效果,但对 CAS 机制进行了优化,尝试使用分段 CAS 以及自动分段迁移的方式来大幅度提升多线程高并发执行 CAS 操作的性能
CAS 底层实现是在一个循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈修改成功率很高,否则失败率很高,失败后这些重复的原子性操作会耗费性能(导致大量线程空循环,自旋转)
优化核心思想:数据分离,将 AtomicLong 的单点的更新压力分担到各个节点,空间换时间,在低并发的时候直接更新,可以保障和 AtomicLong 的性能基本一致,而在高并发的时候通过分散减少竞争,提高了性能
分段 CAS 机制:
在发生竞争时,创建 Cell 数组用于将不同线程的操作离散(通过 hash 等算法映射)到不同的节点上
设置多个累加单元(会根据需要扩容,最大为 CPU 核数),Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1] 等,最后将结果汇总
在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能
自动分段迁移机制:某个 Cell 的 value 执行 CAS 失败,就会自动寻找另一个 Cell 分段内的 value 值进行 CAS 操作
伪共享
Cell 为累加单元:数组访问索引是通过 Thread 里的 threadLocalRandomProbe 域取模实现的,这个域是 ThreadLocalRandom 更新的
// Striped64.Cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 用 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}Cell 是数组形式,在内存中是连续存储的,64 位系统中,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),每一个 cache line 为 64 字节,因此缓存行可以存下 2 个的 Cell 对象,当 Core-0 要修改 Cell[0]、Core-1 要修改 Cell[1],无论谁修改成功都会导致当前缓存行失效,从而导致对方的数据失效,需要重新去主存获取,影响效率

@sun.misc.Contended:防止缓存行伪共享,在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,使用 2 倍于大多数硬件缓存行让 CPU 将对象预读至缓存时占用不同的缓存行,这样就不会造成对方缓存行的失效

源码解析
Striped64 类成员属性:
// 表示当前计算机CPU数量
static final int NCPU = Runtime.getRuntime().availableProcessors()
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域,当 cells 扩容时,也会将数据写到 base 中
transient volatile long base;
// 在 cells 初始化或扩容时只能有一个线程执行, 通过 CAS 更新 cellsBusy 置为 1 来实现一个锁
transient volatile int cellsBusy;
工作流程:
cells 占用内存是相对比较大的,是惰性加载的,在无竞争或者其他线程正在初始化 cells 数组的情况下,直接更新 base 域
在第一次发生竞争时(casBase 失败)会创建一个大小为 2 的 cells 数组,将当前累加的值包装为 Cell 对象,放入映射的槽位上
分段累加的过程中,如果当前线程对应的 cells 槽位为空,就会新建 Cell 填充,如果出现竞争,就会重新计算线程对应的槽位,继续自旋尝试修改
分段迁移后还出现竞争就会扩容 cells 数组长度为原来的两倍,然后 rehash,数组长度总是 2 的 n 次幂,默认最大为 CPU 核数,但是可以超过,如果核数是 6 核,数组最长是 8
方法分析:
LongAdder#add:累加方法
public void add(long x) {
// as 为累加单元数组的引用,b 为基础值,v 表示期望值
// m 表示 cells 数组的长度 - 1,a 表示当前线程命中的 cell 单元格
Cell[] as; long b, v; int m; Cell a;
// cells 不为空说明 cells 已经被初始化,线程发生了竞争,去更新对应的 cell 槽位
// 进入 || 后的逻辑去更新 base 域,更新失败表示发生竞争进入条件
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 为 true 表示 cell 没有竞争
boolean uncontended = true;
// 条件一: true 说明 cells 未初始化,多线程写 base 发生竞争需要进行初始化 cells 数组
// fasle 说明 cells 已经初始化,进行下一个条件寻找自己的 cell 去累加
// 条件二: getProbe() 获取 hash 值,& m 的逻辑和 HashMap 的逻辑相同,保证散列的均匀性
// true 说明当前线程对应下标的 cell 为空,需要创建 cell
// false 说明当前线程对应的 cell 不为空,进行下一个条件【将 x 值累加到对应的 cell 中】
// 条件三: 有取反符号,false 说明 cas 成功,直接返回,true 说明失败,当前线程对应的 cell 有竞争
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
// 【uncontended 在对应的 cell 上累加失败的时候才为 false,其余情况均为 true】
}
}Striped64#longAccumulate:cell 数组创建
// x null false | true
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 当前线程还没有对应的 cell, 需要随机生成一个 hash 值用来将当前线程绑定到 cell
if ((h = getProbe()) == 0) {
// 初始化 probe,获取 hash 值
ThreadLocalRandom.current();
h = getProbe();
// 默认情况下 当前线程肯定是写入到了 cells[0] 位置,不把它当做一次真正的竞争
wasUncontended = true;
}
// 表示【扩容意向】,false 一定不会扩容,true 可能会扩容
boolean collide = false;
//自旋
for (;;) {
// as 表示cells引用,a 表示当前线程命中的 cell,n 表示 cells 数组长度,v 表示 期望值
Cell[] as; Cell a; int n; long v;
// 【CASE1】: 表示 cells 已经初始化了,当前线程应该将数据写入到对应的 cell 中
if ((as = cells) != null && (n = as.length) > 0) {
// CASE1.1: true 表示当前线程对应的索引下标的 Cell 为 null,需要创建 new Cell
if ((a = as[(n - 1) & h]) == null) {
// 判断 cellsBusy 是否被锁
if (cellsBusy == 0) {
// 创建 cell, 初始累加值为 x
Cell r = new Cell(x);
// 加锁
if (cellsBusy == 0 && casCellsBusy()) {
// 创建成功标记,进入【创建 cell 逻辑】
boolean created = false;
try {
Cell[] rs; int m, j;
// 把当前 cells 数组赋值给 rs,并且不为 null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
// 再次判断防止其它线程初始化过该位置,当前线程再次初始化该位置会造成数据丢失
// 因为这里是线程安全的判断,进行的逻辑不会被其他线程影响
rs[j = (m - 1) & h] == null) {
// 把新创建的 cell 填充至当前位置
rs[j] = r;
created = true; // 表示创建完成
}
} finally {
cellsBusy = 0; // 解锁
}
if (created) // true 表示创建完成,可以推出循环了
break;
continue;
}
}
collide = false;
}
// CASE1.2: 条件成立说明线程对应的 cell 有竞争, 改变线程对应的 cell 来重试 cas
else if (!wasUncontended)
wasUncontended = true;
// CASE 1.3: 当前线程 rehash 过,如果新命中的 cell 不为空,就尝试累加,false 说明新命中也有竞争
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
// CASE 1.4: cells 长度已经超过了最大长度 CPU 内核的数量或者已经扩容
else if (n >= NCPU || cells != as)
collide = false; // 扩容意向改为false,【表示不能扩容了】
// CASE 1.5: 更改扩容意向,如果 n >= NCPU,这里就永远不会执行到,case1.4 永远先于 1.5 执行
else if (!collide)
collide = true;
// CASE 1.6: 【扩容逻辑】,进行加锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 线程安全的检查,防止期间被其他线程扩容了
if (cells == as) {
// 扩容为以前的 2 倍
Cell[] rs = new Cell[n << 1];
// 遍历移动值
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 把扩容后的引用给 cells
cells = rs;
}
} finally {
cellsBusy = 0; // 解锁
}
collide = false; // 扩容意向改为 false,表示不扩容了
continue;
}
// 重置当前线程 Hash 值,这就是【分段迁移机制】
h = advanceProbe(h);
}
// 【CASE2】: 运行到这说明 cells 还未初始化,as 为null
// 判断是否没有加锁,没有加锁就用 CAS 加锁
// 条件二判断是否其它线程在当前线程给 as 赋值之后修改了 cells,这里不是线程安全的判断
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 初始化标志,开始 【初始化 cells 数组】
boolean init = false;
try {
// 再次判断 cells == as 防止其它线程已经提前初始化了,当前线程再次初始化导致丢失数据
// 因为这里是【线程安全的,重新检查,经典 DCL】
if (cells == as) {
Cell[] rs = new Cell[2]; // 初始化数组大小为2
rs[h & 1] = new Cell(x); // 填充线程对应的cell
cells = rs;
init = true; // 初始化成功,标记置为 true
}
} finally {
cellsBusy = 0; // 解锁啊
}
if (init)
break; // 初始化成功直接跳出自旋
}
// 【CASE3】: 运行到这说明其他线程在初始化 cells,当前线程将值累加到 base,累加成功直接结束自旋
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}sum:获取最终结果通过 sum 整合,保证最终一致性,不保证强一致性
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
// 遍历 累加
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}ABA
ABA 问题:当进行获取主内存值时,该内存值在写入主内存时已经被修改了 N 次,但是最终又改成原来的值
其他线程先把 A 改成 B 又改回 A,主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,这时 CAS 虽然成功,但是过程存在问题
构造方法:
public AtomicStampedReference(V initialRef, int initialStamp):初始值和初始版本号
常用API:
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp):期望引用和期望版本号都一致才进行 CAS 修改数据public void set(V newReference, int newStamp):设置值和版本号public V getReference():返回引用的值public int getStamp():返回当前版本号
public static void main(String[] args) {
AtomicStampedReference<Integer> atomicReference = new AtomicStampedReference<>(100,1);
int startStamp = atomicReference.getStamp();
new Thread(() ->{
int stamp = atomicReference.getStamp();
atomicReference.compareAndSet(100, 101, stamp, stamp + 1);
stamp = atomicReference.getStamp();
atomicReference.compareAndSet(101, 100, stamp, stamp + 1);
},"t1").start();
new Thread(() ->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!atomicReference.compareAndSet(100, 200, startStamp, startStamp + 1)) {
System.out.println(atomicReference.getReference());//100
System.out.println(Thread.currentThread().getName() + "线程修改失败");
}
},"t2").start();
}Unsafe
Unsafe 是 CAS 的核心类,由于 Java 无法直接访问底层系统,需要通过本地(Native)方法来访问
Unsafe 类存在 sun.misc 包,其中所有方法都是 native 修饰的,都是直接调用操作系统底层资源执行相应的任务,基于该类可以直接操作特定的内存数据,其内部方法操作类似 C 的指针
模拟实现原子整数:
public static void main(String[] args) {
MyAtomicInteger atomicInteger = new MyAtomicInteger(10);
if (atomicInteger.compareAndSwap(20)) {
System.out.println(atomicInteger.getValue());
}
}
class MyAtomicInteger {
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
private volatile int value;
static {
try {
//Unsafe unsafe = Unsafe.getUnsafe()这样会报错,需要反射获取
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
// 获取 value 属性的内存地址,value 属性指向该地址,直接设置该地址的值可以修改 value 的值
VALUE_OFFSET = UNSAFE.objectFieldOffset(
MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
public MyAtomicInteger(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public boolean compareAndSwap(int update) {
while (true) {
int prev = this.value;
int next = update;
// 当前对象 内存偏移量 期望值 更新值
if (UNSAFE.compareAndSwapInt(this, VALUE_OFFSET, prev, update)) {
System.out.println("CAS成功");
return true;
}
}
}
}final
原理
public class TestFinal {
final int a = 20;
}
字节码:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."< init>":()V
4: aload_0
5: bipush 20 // 将值直接放入栈中
7: putfield #2 // Field a:I
<-- 写屏障
10: returnfinal 变量的赋值通过 putfield 指令来完成,在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况
其他线程访问 final 修饰的变量
复制一份放入栈中直接访问,效率高
大于 short 最大值会将其复制到类的常量池,访问时从常量池获取
不可变
不可变:如果一个对象不能够修改其内部状态(属性),那么就是不可变对象
不可变对象线程安全的,不存在并发修改和可见性问题,是另一种避免竞争的方式
String 类也是不可变的,该类和类中所有属性都是 final 的
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
无写入方法(set)确保外部不能对内部属性进行修改
属性用 final 修饰保证了该属性是只读的,不能修改
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
//....
}- 更改 String 类数据时,会构造新字符串对象,生成新的 char[] value,通过创建副本对象来避免共享的方式称之为保护性拷贝
State
无状态:成员变量保存的数据也可以称为状态信息,无状态就是没有成员变量
Servlet 为了保证其线程安全,一般不为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的
Local
基本介绍
ThreadLocal 类用来提供线程内部的局部变量,这种变量在多线程环境下访问(通过 get 和 set 方法访问)时能保证各个线程的变量相对独立于其他线程内的变量,分配在堆内的 TLAB 中
ThreadLocal 实例通常来说都是 private static 类型的,属于一个线程的本地变量,用于关联线程和线程上下文。每个线程都会在 ThreadLocal 中保存一份该线程独有的数据,所以是线程安全的
ThreadLocal 作用:
线程并发:应用在多线程并发的场景下
传递数据:通过 ThreadLocal 实现在同一线程不同函数或组件中传递公共变量,减少传递复杂度
线程隔离:每个线程的变量都是独立的,不会互相影响
对比 synchronized:
| synchronized | ThreadLocal | |
|---|---|---|
| 原理 | 同步机制采用以时间换空间的方式,只提供了一份变量,让不同的线程排队访问 | ThreadLocal 采用以空间换时间的方式,为每个线程都提供了一份变量的副本,从而实现同时访问而相不干扰 |
| 侧重点 | 多个线程之间访问资源的同步 | 多线程中让每个线程之间的数据相互隔离 |
基本使用
常用方法
| 方法 | 描述 |
|---|---|
| ThreadLocal<>() | 创建 ThreadLocal 对象 |
| protected T initialValue() | 返回当前线程局部变量的初始值 |
| public void set( T value) | 设置当前线程绑定的局部变量 |
| public T get() | 获取当前线程绑定的局部变量 |
| public void remove() | 移除当前线程绑定的局部变量 |
public class MyDemo {
private static ThreadLocal<String> tl = new ThreadLocal<>();
private String content;
private String getContent() {
// 获取当前线程绑定的变量
return tl.get();
}
private void setContent(String content) {
// 变量content绑定到当前线程
tl.set(content);
}
public static void main(String[] args) {
MyDemo demo = new MyDemo();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 设置数据
demo.setContent(Thread.currentThread().getName() + "的数据");
System.out.println("-----------------------");
System.out.println(Thread.currentThread().getName() + "--->" + demo.getContent());
}
});
thread.setName("线程" + i);
thread.start();
}
}
}应用场景
ThreadLocal 适用于下面两种场景:
每个线程需要有自己单独的实例
实例需要在多个方法中共享,但不希望被多线程共享
ThreadLocal 方案有两个突出的优势:
传递数据:保存每个线程绑定的数据,在需要的地方可以直接获取,避免参数直接传递带来的代码耦合问题
线程隔离:各线程之间的数据相互隔离却又具备并发性,避免同步方式带来的性能损失
ThreadLocal 用于数据连接的事务管理:
public class JdbcUtils {
// ThreadLocal对象,将connection绑定在当前线程中
private static final ThreadLocal<Connection> tl = new ThreadLocal();
// c3p0 数据库连接池对象属性
private static final ComboPooledDataSource ds = new ComboPooledDataSource();
// 获取连接
public static Connection getConnection() throws SQLException {
//取出当前线程绑定的connection对象
Connection conn = tl.get();
if (conn == null) {
//如果没有,则从连接池中取出
conn = ds.getConnection();
//再将connection对象绑定到当前线程中,非常重要的操作
tl.set(conn);
}
return conn;
}
// ...
}用 ThreadLocal 使 SimpleDateFormat 从独享变量变成单个线程变量:
public class ThreadLocalDateUtil {
private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
public static Date parse(String dateStr) throws ParseException {
return threadLocal.get().parse(dateStr);
}
public static String format(Date date) {
return threadLocal.get().format(date);
}
}实现原理
底层结构
JDK8 以前:每个 ThreadLocal 都创建一个 Map,然后用线程作为 Map 的 key,要存储的局部变量作为 Map 的 value,达到各个线程的局部变量隔离的效果。这种结构会造成 Map 结构过大和内存泄露,因为 Thread 停止后无法通过 key 删除对应的数据

JDK8 以后:每个 Thread 维护一个 ThreadLocalMap,这个 Map 的 key 是 ThreadLocal 实例本身,value 是真正要存储的值
每个 Thread 线程内部都有一个 Map (ThreadLocalMap)
Map 里面存储 ThreadLocal 对象(key)和线程的私有变量(value)
Thread 内部的 Map 是由 ThreadLocal 维护的,由 ThreadLocal 负责向 map 获取和设置线程的变量值
对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成副本的隔离,互不干扰

JDK8 前后对比:
每个 Map 存储的 Entry 数量会变少,因为之前的存储数量由 Thread 的数量决定,现在由 ThreadLocal 的数量决定,在实际编程当中,往往 ThreadLocal 的数量要少于 Thread 的数量
当 Thread 销毁之后,对应的 ThreadLocalMap 也会随之销毁,能减少内存的使用,防止内存泄露
成员变量
Thread 类的相关属性:每一个线程持有一个 ThreadLocalMap 对象,存放由 ThreadLocal 和数据组成的 Entry 键值对
ThreadLocal.ThreadLocalMap threadLocals = null
计算 ThreadLocal 对象的哈希值:
private final int threadLocalHashCode = nextHashCode()
使用
threadLocalHashCode & (table.length - 1)计算当前 entry 需要存放的位置每创建一个 ThreadLocal 对象就会使用 nextHashCode 分配一个 hash 值给这个对象:
private static AtomicInteger nextHashCode = new AtomicInteger()
斐波那契数也叫黄金分割数,hash 的增量就是这个数字,带来的好处是 hash 分布非常均匀:
private static final int HASH_INCREMENT = 0x61c88647
成员方法
方法都是线程安全的,因为 ThreadLocal 属于一个线程的,ThreadLocal 中的方法,逻辑都是获取当前线程维护的 ThreadLocalMap 对象,然后进行数据的增删改查,没有指定初始值的 threadlcoal 对象默认赋值为 null
initialValue():返回该线程局部变量的初始值
延迟调用的方法,在执行 get 方法时才执行
该方法缺省(默认)实现直接返回一个 null
如果想要一个初始值,可以重写此方法, 该方法是一个
protected的方法,为了让子类覆盖而设计的
protected T initialValue() {
return null;
}- nextHashCode():计算哈希值,ThreadLocal 的散列方式称之为斐波那契散列,每次获取哈希值都会加上 HASH_INCREMENT,这样做可以尽量避免 hash 冲突,让哈希值能均匀的分布在 2 的 n 次方的数组中
private static int nextHashCode() {
// 哈希值自增一个 HASH_INCREMENT 数值
return nextHashCode.getAndAdd(HASH_INCREMENT);
}- set():修改当前线程与当前 threadlocal 对象相关联的线程局部变量
public void set(T value) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的 ThreadLocalMap 对象
ThreadLocalMap map = getMap(t);
// 判断 map 是否存在
if (map != null)
// 调用 threadLocalMap.set 方法进行重写或者添加
map.set(this, value);
else
// map 为空,调用 createMap 进行 ThreadLocalMap 对象的初始化。参数1是当前线程,参数2是局部变量
createMap(t, value);
}
// 获取当前线程 Thread 对应维护的 ThreadLocalMap
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
// 创建当前线程Thread对应维护的ThreadLocalMap
void createMap(Thread t, T firstValue) {
// 【这里的 this 是调用此方法的 threadLocal】,创建一个新的 Map 并设置第一个数据
t.threadLocals = new ThreadLocalMap(this, firstValue);
}- get():获取当前线程与当前 ThreadLocal 对象相关联的线程局部变量
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null) {
// 以当前的 ThreadLocal 为 key,调用 getEntry 获取对应的存储实体 e
ThreadLocalMap.Entry e = map.getEntry(this);
// 对 e 进行判空
if (e != null) {
// 获取存储实体 e 对应的 value值
T result = (T)e.value;
return result;
}
}
/*有两种情况有执行当前代码
第一种情况: map 不存在,表示此线程没有维护的 ThreadLocalMap 对象
第二种情况: map 存在, 但是【没有与当前 ThreadLocal 关联的 entry】,就会设置为默认值 */
// 初始化当前线程与当前 threadLocal 对象相关联的 value
return setInitialValue();
}
private T setInitialValue() {
// 调用initialValue获取初始化的值,此方法可以被子类重写, 如果不重写默认返回 null
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 判断 map 是否初始化过
if (map != null)
// 存在则调用 map.set 设置此实体 entry,value 是默认的值
map.set(this, value);
else
// 调用 createMap 进行 ThreadLocalMap 对象的初始化中
createMap(t, value);
// 返回线程与当前 threadLocal 关联的局部变量
return value;
}- remove():移除当前线程与当前 threadLocal 对象相关联的线程局部变量
public void remove() {
// 获取当前线程对象中维护的 ThreadLocalMap 对象
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
// map 存在则调用 map.remove,this时当前ThreadLocal,以this为key删除对应的实体
m.remove(this);
}LocalMap
成员属性
ThreadLocalMap 是 ThreadLocal 的内部类,没有实现 Map 接口,用独立的方式实现了 Map 的功能,其内部 Entry 也是独立实现
// 初始化当前 map 内部散列表数组的初始长度 16
private static final int INITIAL_CAPACITY = 16;
// 存放数据的table,数组长度必须是2的整次幂。
private Entry[] table;
// 数组里面 entrys 的个数,可以用于判断 table 当前使用量是否超过阈值
private int size = 0;
// 进行扩容的阈值,表使用量大于它的时候进行扩容。
private int threshold;
存储结构 Entry:
Entry 继承 WeakReference,key 是弱引用,目的是将 ThreadLocal 对象的生命周期和线程生命周期解绑
Entry 限制只能用 ThreadLocal 作为 key,key 为 null (entry.get() == null) 意味着 key 不再被引用,entry 也可以从 table 中清除
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
// this.referent = referent = key;
super(k);
value = v;
}
}构造方法:延迟初始化的,线程第一次存储 threadLocal - value 时才会创建 threadLocalMap 对象
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化table,创建一个长度为16的Entry数组
table = new Entry[INITIAL_CAPACITY];
// 【寻址算法】计算索引
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 创建 entry 对象,存放到指定位置的 slot 中
table[i] = new Entry(firstKey, firstValue);
// 数据总量是 1
size = 1;
// 将阈值设置为 (当前数组长度 * 2)/ 3。
setThreshold(INITIAL_CAPACITY);
}成员方法
set():添加数据,ThreadLocalMap 使用线性探测法来解决哈希冲突
该方法会一直探测下一个地址,直到有空的地址后插入,若插入后 Map 数量超过阈值,数组会扩容为原来的 2 倍
假设当前 table 长度为16,计算出来 key 的 hash 值为 14,如果 table[14] 上已经有值,并且其 key 与当前 key 不一致,那么就发生了 hash 冲突,这个时候将 14 加 1 得到 15,取 table[15] 进行判断,如果还是冲突会回到 0,取 table[0],以此类推,直到可以插入,可以把 Entry[] table 看成一个环形数组
线性探测法会出现堆积问题,可以采取平方探测法解决
在探测过程中 ThreadLocal 会复用 key 为 null 的脏 Entry 对象,并进行垃圾清理,防止出现内存泄漏
private void set(ThreadLocal<?> key, Object value) {
// 获取散列表
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
// 哈希寻址
int i = key.threadLocalHashCode & (len-1);
// 使用线性探测法向后查找元素,碰到 entry 为空时停止探测
for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 获取当前元素 key
ThreadLocal<?> k = e.get();
// ThreadLocal 对应的 key 存在,【直接覆盖之前的值】
if (k == key) {
e.value = value;
return;
}
// 【这两个条件谁先成立不一定,所以 replaceStaleEntry 中还需要判断 k == key 的情况】
// key 为 null,但是值不为 null,说明之前的 ThreadLocal 对象已经被回收了,当前是【过期数据】
if (k == null) {
// 【碰到一个过期的 slot,当前数据复用该槽位,替换过期数据】
// 这个方法还进行了垃圾清理动作,防止内存泄漏
replaceStaleEntry(key, value, i);
return;
}
}
// 逻辑到这说明碰到 slot == null 的位置,则在空元素的位置创建一个新的 Entry
tab[i] = new Entry(key, value);
// 数量 + 1
int sz = ++size;
// 【做一次启发式清理】,如果没有清除任何 entry 并且【当前使用量达到了负载因子所定义,那么进行 rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 扩容
rehash();
}
// 获取【环形数组】的下一个索引
private static int nextIndex(int i, int len) {
// 索引越界后从 0 开始继续获取
return ((i + 1 < len) ? i + 1 : 0);
}
// 在指定位置插入指定的数据
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
// 获取散列表
Entry[] tab = table;
int len = tab.length;
Entry e;
// 探测式清理的开始下标,默认从当前 staleSlot 开始
int slotToExpunge = staleSlot;
// 以当前 staleSlot 开始【向前迭代查找】,找到索引靠前过期数据,找到以后替换 slotToExpunge 值
// 【保证在一个区间段内,从最前面的过期数据开始清理】
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// 以 staleSlot 【向后去查找】,直到碰到 null 为止,还是线性探测
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
// 获取当前节点的 key
ThreadLocal<?> k = e.get();
// 条件成立说明是【替换逻辑】
if (k == key) {
e.value = value;
// 因为本来要在 staleSlot 索引处插入该数据,现在找到了i索引处的key与数据一致
// 但是 i 位置距离正确的位置更远,因为是向后查找,所以还是要在 staleSlot 位置插入当前 entry
// 然后将 table[staleSlot] 这个过期数据放到当前循环到的 table[i] 这个位置,
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 条件成立说明向前查找过期数据并未找到过期的 entry,但 staleSlot 位置已经不是过期数据了,i 位置才是
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 【清理过期数据,expungeStaleEntry 探测式清理,cleanSomeSlots 启发式清理】
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 条件成立说明当前遍历的 entry 是一个过期数据,并且该位置前面也没有过期数据
if (k == null && slotToExpunge == staleSlot)
// 探测式清理过期数据的开始下标修改为当前循环的 index,因为 staleSlot 会放入要添加的数据
slotToExpunge = i;
}
// 向后查找过程中并未发现 k == key 的 entry,说明当前是一个【取代过期数据逻辑】
// 删除原有的数据引用,防止内存泄露
tab[staleSlot].value = null;
// staleSlot 位置添加数据,【上面的所有逻辑都不会更改 staleSlot 的值】
tab[staleSlot] = new Entry(key, value);
// 条件成立说明除了 staleSlot 以外,还发现其它的过期 slot,所以要【开启清理数据的逻辑】
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
private static int prevIndex(int i, int len) {
// 形成一个环绕式的访问,头索引越界后置为尾索引
return ((i - 1 >= 0) ? i - 1 : len - 1);
}- getEntry():ThreadLocal 的 get 方法以当前的 ThreadLocal 为 key,调用 getEntry 获取对应的存储实体 e
private Entry getEntry(ThreadLocal<?> key) {
// 哈希寻址
int i = key.threadLocalHashCode & (table.length - 1);
// 访问散列表中指定指定位置的 slot
Entry e = table[i];
// 条件成立,说明 slot 有值并且 key 就是要寻找的 key,直接返回
if (e != null && e.get() == key)
return e;
else
// 进行线性探测
return getEntryAfterMiss(key, i, e);
}
// 线性探测寻址
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
// 获取散列表
Entry[] tab = table;
int len = tab.length;
// 开始遍历,碰到 slot == null 的情况,搜索结束
while (e != null) {
// 获取当前 slot 中 entry 对象的 key
ThreadLocal<?> k = e.get();
// 条件成立说明找到了,直接返回
if (k == key)
return e;
if (k == null)
// 过期数据,【探测式过期数据回收】
expungeStaleEntry(i);
else
// 更新 index 继续向后走
i = nextIndex(i, len);
// 获取下一个槽位中的 entry
e = tab[i];
}
// 说明当前区段没有找到相应数据
// 【因为存放数据是线性的向后寻找槽位,都是紧挨着的,不可能越过一个 空槽位 在后面放】,可以减少遍历的次数
return null;
}- rehash():触发一次全量清理,如果数组长度大于等于长度的
2/3 * 3/4 = 1/2,则进行 resize
private void rehash() {
// 清楚当前散列表内的【所有】过期的数据
expungeStaleEntries();
// threshold = len * 2 / 3,就是 2/3 * (1 - 1/4)
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
// 【遍历所有的槽位,清理过期数据】
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}Entry **数组为扩容为原来的 2 倍** ,重新计算 key 的散列值,如果遇到 key 为 null 的情况,会将其 value 也置为 null,帮助 GC
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 新数组的长度是老数组的二倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
// 统计新table中的entry数量
int count = 0;
// 遍历老表,进行【数据迁移】
for (int j = 0; j < oldLen; ++j) {
// 访问老表的指定位置的 entry
Entry e = oldTab[j];
// 条件成立说明老表中该位置有数据,可能是过期数据也可能不是
if (e != null) {
ThreadLocal<?> k = e.get();
// 过期数据
if (k == null) {
e.value = null; // Help the GC
} else {
// 非过期数据,在新表中进行哈希寻址
int h = k.threadLocalHashCode & (newLen - 1);
// 【线程探测】
while (newTab[h] != null)
h = nextIndex(h, newLen);
// 将数据存放到新表合适的 slot 中
newTab[h] = e;
count++;
}
}
}
// 设置下一次触发扩容的指标:threshold = len * 2 / 3;
setThreshold(newLen);
size = count;
// 将扩容后的新表赋值给 threadLocalMap 内部散列表数组引用
table = newTab;
}- remove():删除 Entry
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
// 哈希寻址
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 找到了对应的 key
if (e.get() == key) {
// 设置 key 为 null
e.clear();
// 探测式清理
expungeStaleEntry(i);
return;
}
}
}清理方法
- 探测式清理:沿着开始位置向后探测清理过期数据,沿途中碰到未过期数据则将此数据 rehash 在 table 数组中的定位,重定位后的元素理论上更接近
i = entry.key & (table.length - 1),让数据的排列更紧凑,会优化整个散列表查询性能
// table[staleSlot] 是一个过期数据,以这个位置开始继续向后查找过期数据
private int expungeStaleEntry(int staleSlot) {
// 获取散列表和数组长度
Entry[] tab = table;
int len = tab.length;
// help gc,先把当前过期的 entry 置空,在取消对 entry 的引用
tab[staleSlot].value = null;
tab[staleSlot] = null;
// 数量-1
size--;
Entry e;
int i;
// 从 staleSlot 开始向后遍历,直到碰到 slot == null 结束,【区间内清理过期数据】
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 当前 entry 是过期数据
if (k == null) {
// help gc
e.value = null;
tab[i] = null;
size--;
} else {
// 当前 entry 不是过期数据的逻辑,【rehash】
// 重新计算当前 entry 对应的 index
int h = k.threadLocalHashCode & (len - 1);
// 条件成立说明当前 entry 存储时发生过 hash 冲突,向后偏移过了
if (h != i) {
// 当前位置置空
tab[i] = null;
// 以正确位置 h 开始,向后查找第一个可以存放 entry 的位置
while (tab[h] != null)
h = nextIndex(h, len);
// 将当前元素放入到【距离正确位置更近的位置,有可能就是正确位置】
tab[h] = e;
}
}
}
// 返回 slot = null 的槽位索引,图例是 7,这个索引代表【索引前面的区间已经清理完成垃圾了】
return i;
}

- 启发式清理:向后循环扫描过期数据,发现过期数据调用探测式清理方法,如果连续几次的循环都没有发现过期数据,就停止扫描
// i 表示启发式清理工作开始位置,一般是空 slot,n 一般传递的是 table.length
private boolean cleanSomeSlots(int i, int n) {
// 表示启发式清理工作是否清除了过期数据
boolean removed = false;
// 获取当前 map 的散列表引用
Entry[] tab = table;
int len = tab.length;
do {
// 获取下一个索引,因为探测式返回的 slot 为 null
i = nextIndex(i, len);
Entry e = tab[i];
// 条件成立说明是过期的数据,key 被 gc 了
if (e != null && e.get() == null) {
// 【发现过期数据重置 n 为数组的长度】
n = len;
// 表示清理过过期数据
removed = true;
// 以当前过期的 slot 为开始节点 做一次探测式清理工作
i = expungeStaleEntry(i);
}
// 假设 table 长度为 16
// 16 >>> 1 ==> 8,8 >>> 1 ==> 4,4 >>> 1 ==> 2,2 >>> 1 ==> 1,1 >>> 1 ==> 0
// 连续经过这么多次循环【没有扫描到过期数据】,就停止循环,扫描到空 slot 不算,因为不是过期数据
} while ((n >>>= 1) != 0);
// 返回清除标记
return removed;
}内存泄漏
Memory leak:内存泄漏是指程序中动态分配的堆内存由于某种原因未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果,内存泄漏的堆积终将导致内存溢出
如果 key 使用强引用:使用完 ThreadLocal ,threadLocal Ref 被回收,但是 threadLocalMap 的 Entry 强引用了 threadLocal,造成 threadLocal 无法被回收,无法完全避免内存泄漏

如果 key 使用弱引用:使用完 ThreadLocal ,threadLocal Ref 被回收,ThreadLocalMap 只持有 ThreadLocal 的弱引用,所以threadlocal 也可以被回收,此时 Entry 中的 key = null。但没有手动删除这个 Entry 或者 CurrentThread 依然运行,依然存在强引用链,value 不会被回收,而这块 value 永远不会被访问到,也会导致 value 内存泄漏

两个主要原因:
没有手动删除这个 Entry
CurrentThread 依然运行
根本原因:ThreadLocalMap 是 Thread的一个属性,生命周期跟 Thread 一样长,如果没有手动删除对应 Entry 就会导致内存泄漏
解决方法:使用完 ThreadLocal 中存储的内容后将它 remove 掉就可以
ThreadLocal 内部解决方法:在 ThreadLocalMap 中的 set/getEntry 方法中,通过线性探测法对 key 进行判断,如果 key 为 null(ThreadLocal 为 null)会对 Entry 进行垃圾回收。所以使用弱引用比强引用多一层保障,就算不调用 remove,也有机会进行 GC
变量传递
基本使用
父子线程:创建子线程的线程是父线程,比如实例中的 main 线程就是父线程
ThreadLocal 中存储的是线程的局部变量,如果想实现线程间局部变量传递可以使用 InheritableThreadLocal 类
public static void main(String[] args) {
ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
threadLocal.set("父线程设置的值");
new Thread(() -> System.out.println("子线程输出:" + threadLocal.get())).start();
}
// 子线程输出:父线程设置的值实现原理
InheritableThreadLocal 源码:
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}实现父子线程间的局部变量共享需要追溯到 Thread 对象的构造方法:
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc,
// 该参数默认是 true
boolean inheritThreadLocals) {
// ...
Thread parent = currentThread();
// 判断父线程(创建子线程的线程)的 inheritableThreadLocals 属性不为 null
if (inheritThreadLocals && parent.inheritableThreadLocals != null) {
// 复制父线程的 inheritableThreadLocals 属性,实现父子线程局部变量共享
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
// ..
}
// 【本质上还是创建 ThreadLocalMap,只是把父类中的可继承数据设置进去了】
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
// 获取父线程的哈希表
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
// 【逐个复制父线程 ThreadLocalMap 中的数据】
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
// 调用的是 InheritableThreadLocal#childValue(T parentValue)
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
// 线性探测
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}线程池
基本概述
线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作
线程池作用:
降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
阻塞队列
基本介绍
有界队列和无界队列:
有界队列:有固定大小的队列,比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0
无界队列:没有设置固定大小的队列,这些队列可以直接入队,直到溢出(超过 Integer.MAX_VALUE),所以相当于无界
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:FIFO 队列
ArrayBlockQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的无界(默认大小 Integer.MAX_VALUE)的阻塞队列
PriorityBlockQueue:支持优先级排序的无界阻塞队列
DelayedWorkQueue:使用优先级队列实现的延迟无界阻塞队列
SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个 put 的线程放入元素为止
LinkedTransferQueue:由链表结构组成的无界阻塞队列
LinkedBlockingDeque:由链表结构组成的双向阻塞队列
与普通队列(LinkedList、ArrayList等)的不同点在于阻塞队列中阻塞添加和阻塞删除方法,以及线程安全:
阻塞添加 put():当阻塞队列元素已满时,添加队列元素的线程会被阻塞,直到队列元素不满时才重新唤醒线程执行
阻塞删除 take():在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般会返回被删除的元素)
核心方法
| 方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入(尾) | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除(头) | remove() | poll() | take() | poll(time,unit) |
| 检查(队首元素) | element() | peek() | 不可用 | 不可用 |
抛出异常组:
当阻塞队列满时:在往队列中 add 插入元素会抛出 IIIegalStateException: Queue full
当阻塞队列空时:再往队列中 remove 移除元素,会抛出 NoSuchException
特殊值组:
插入方法:成功 true,失败 false
移除方法:成功返回出队列元素,队列没有就返回 null
阻塞组:
当阻塞队列满时,生产者继续往队列里 put 元素,队列会一直阻塞生产线程直到队列有空间 put 数据或响应中断退出
当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列中有可用元素
超时退出:当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出
链表队列
入队出队
LinkedBlockingQueue 源码:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
/**
* 下列三种情况之一
* - 真正的后继节点
* - 自己, 发生在出队时
* - null, 表示是没有后继节点, 是尾节点了
*/
Node<E> next;
Node(E x) { item = x; }
}
}入队:尾插法
- 初始化链表
last = head = new Node<E>(null),Dummy 节点用来占位,item 为 null
public LinkedBlockingQueue(int capacity) {
// 默认是 Integer.MAX_VALUE
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}- 当一个节点入队:
private void enqueue(Node<E> node) {
// 从右向左计算
last = last.next = node;
}
- 再来一个节点入队
last = last.next = node
出队:出队头节点,FIFO
- 出队源码:
private E dequeue() {
Node<E> h = head;
// 获取临头节点
Node<E> first = h.next;
// 自己指向自己,help GC
h.next = h;
head = first;
// 出队的元素
E x = first.item;
// 【当前节点置为 Dummy 节点】
first.item = null;
return x;
}h = head→first = h.next

h.next = h→head = first

- `first.item = null`:当前节点置为 Dummy 节点
加锁分析
用了两把锁和 dummy 节点:
用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
消费者与消费者线程仍然串行
生产者与生产者线程仍然串行
线程安全分析:
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全,两把锁保证了入队和出队没有竞争
当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition(); // 阻塞等待不满,说明已经满了
// 用于 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); // 阻塞等待不空,说明已经是空的入队出队:
- put 操作:
public void put(E e) throws InterruptedException {
// 空指针异常
if (e == null) throw new NullPointerException();
int c = -1;
// 把待添加的元素封装为 node 节点
Node<E> node = new Node<E>(e);
// 获取全局生产锁
final ReentrantLock putLock = this.putLock;
// count 用来维护元素计数
final AtomicInteger count = this.count;
// 获取可打断锁,会抛出异常
putLock.lockInterruptibly();
try {
// 队列满了等待
while (count.get() == capacity) {
// 【等待队列不满时,就可以生产数据】,线程处于 Waiting
notFull.await();
}
// 有空位, 入队且计数加一,尾插法
enqueue(node);
// 返回自增前的数字
c = count.getAndIncrement();
// put 完队列还有空位, 唤醒其他生产 put 线程,唤醒一个减少竞争
if (c + 1 < capacity)
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
// c自增前是0,说明生产了一个元素,唤醒一个 take 线程
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 调用 notEmpty.signal(),而不是 notEmpty.signalAll() 是为了减少竞争,因为只剩下一个元素
notEmpty.signal();
} finally {
takeLock.unlock();
}
}- take 操作:
public E take() throws InterruptedException {
E x;
int c = -1;
// 元素个数
final AtomicInteger count = this.count;
// 获取全局消费锁
final ReentrantLock takeLock = this.takeLock;
// 可打断锁
takeLock.lockInterruptibly();
try {
// 没有元素可以出队
while (count.get() == 0) {
// 【阻塞等待队列不空,就可以消费数据】,线程处于 Waiting
notEmpty.await();
}
// 出队,计数减一,FIFO,出队头节点
x = dequeue();
// 返回自减前的数字
c = count.getAndDecrement();
// 队列还有元素
if (c > 1)
// 唤醒一个消费take线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c 是消费前的数据,消费前满了,消费一个后还剩一个空位,唤醒生产线程
if (c == capacity)
// 调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争
signalNotFull();
return x;
}性能比较
主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较:
Linked 支持有界,Array 强制有界
Linked 实现是链表,Array 实现是数组
Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
Linked 两把锁,Array 一把锁
同步队列
成员属性
SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个生产者必须阻塞匹配到一个消费者
成员变量:
运行当前程序的平台拥有 CPU 的数量:
static final int NCPUS = Runtime.getRuntime().availableProcessors()指定超时时间后,当前线程最大自旋次数:
// 只有一个 CPU 时自旋次数为 0,所有程序都是串行执行,多核 CPU 时自旋 32 次是一个经验值
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;自旋的原因:线程挂起唤醒需要进行上下文切换,涉及到用户态和内核态的转变,是非常消耗资源的。自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了,如果自旋次数达到某个指标后,还是会将当前线程挂起
未指定超时时间,当前线程最大自旋次数:
static final int maxUntimedSpins = maxTimedSpins * 16; // maxTimedSpins 的 16 倍指定超时限制的阈值,小于该值的线程不会被挂起:
static final long spinForTimeoutThreshold = 1000L; // 纳秒超时时间设置的小于该值,就会被禁止挂起,阻塞再唤醒的成本太高,不如选择自旋空转
转换器:
private transient volatile Transferer<E> transferer;
abstract static class Transferer<E> {
/**
* 参数一:可以为 null,null 时表示这个请求是一个 REQUEST 类型的请求,反之是一个 DATA 类型的请求
* 参数二:如果为 true 表示指定了超时时间,如果为 false 表示不支持超时,会一直阻塞到匹配或者被打断
* 参数三:超时时间限制,单位是纳秒
* 返回值:返回值如果不为 null 表示匹配成功,DATA 类型的请求返回当前线程 put 的数据
* 如果返回 null,表示请求超时或被中断
*/
abstract E transfer(E e, boolean timed, long nanos);
}- 构造方法:
public SynchronousQueue(boolean fair) {
// fair 默认 false
// 非公平模式实现的数据结构是栈,公平模式的数据结构是队列
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}- 成员方法:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public E poll() {
return transferer.transfer(null, true, 0);
}非公实现
TransferStack 是非公平的同步队列,因为所有的请求都被压入栈中,栈顶的元素会最先得到匹配,造成栈底的等待线程饥饿
TransferStack 类成员变量:
- 请求类型:
// 表示 Node 类型为请求类型
static final int REQUEST = 0;
// 表示 Node类 型为数据类型
static final int DATA = 1;
// 表示 Node 类型为匹配中类型
// 假设栈顶元素为 REQUEST-NODE,当前请求类型为 DATA,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】
// 假设栈顶元素为 DATA-NODE,当前请求类型为 REQUEST,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】
static final int FULFILLING = 2;栈顶元素:
volatile SNode head;
内部类 SNode:
- 成员变量:
static final class SNode {
// 指向下一个栈帧
volatile SNode next;
// 与当前 node 匹配的节点
volatile SNode match;
// 假设当前node对应的线程自旋期间未被匹配成功,那么node对应的线程需要挂起,
// 挂起前 waiter 保存对应的线程引用,方便匹配成功后,被唤醒。
volatile Thread waiter;
// 数据域,不为空表示当前 Node 对应的请求类型为 DATA 类型,反之则表示 Node 为 REQUEST 类型
Object item;
// 表示当前Node的模式 【DATA/REQUEST/FULFILLING】
int mode;
}- 构造方法:
SNode(Object item) {
this.item = item;
}- 设置方法:设置 Node 对象的 next 字段,此处对 CAS 进行了优化,提升了 CAS 的效率
boolean casNext(SNode cmp, SNode val) {
//【优化:cmp == next】,可以提升一部分性能。 cmp == next 不相等,就没必要走 cas指令。
return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}- 匹配方法:
boolean tryMatch(SNode s) {
// 当前 node 尚未与任何节点发生过匹配,CAS 设置 match 字段为 s 节点,表示当前 node 已经被匹配
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
// 当前 node 如果自旋结束,会 park 阻塞,阻塞前将 node 对应的 Thread 保留到 waiter 字段
// 获取当前 node 对应的阻塞线程
Thread w = waiter;
// 条件成立说明 node 对应的 Thread 正在阻塞
if (w != null) {
waiter = null;
// 使用 unpark 方式唤醒线程
LockSupport.unpark(w);
}
return true;
}
// 匹配成功返回 true
return match == s;
}- 取消方法:
// 取消节点的方法
void tryCancel() {
// match 字段指向自己,表示这个 node 是取消状态,取消状态的 node,最终会被强制移除出栈
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}TransferStack 类成员方法:
- snode():填充节点方法
static SNode snode(SNode s, Object e, SNode next, int mode) {
// 引用指向空时,snode 方法会创建一个 SNode 对象
if (s == null) s = new SNode(e);
// 填充数据
s.mode = mode;
s.next = next;
return s;
}- transfer():核心方法,请求匹配出栈,不匹配阻塞
E transfer(E e, boolean timed, long nanos) {
// 包装当前线程的 node
SNode s = null;
// 根据元素判断当前的请求类型
int mode = (e == null) ? REQUEST : DATA;
// 自旋
for (;;) {
// 获取栈顶指针
SNode h = head;
// 【CASE1】:当前栈为空或者栈顶 node 模式与当前请求模式一致无法匹配,做入栈操作
if (h == null || h.mode == mode) {
// 当前请求是支持超时的,但是 nanos <= 0 说明这个请求不支持 “阻塞等待”
if (timed && nanos <= 0) {
// 栈顶元素是取消状态
if (h != null && h.isCancelled())
// 栈顶出栈,设置新的栈顶
casHead(h, h.next);
else
// 表示【匹配失败】
return null;
// 入栈
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 等待被匹配的逻辑,正常情况返回匹配的节点;取消情况返回当前节点,就是 s
SNode m = awaitFulfill(s, timed, nanos);
// 说明当前 node 是【取消状态】
if (m == s) {
// 将取消节点出栈
clean(s);
return null;
}
// 执行到这说明【匹配成功】了
// 栈顶有节点并且 匹配节点还未出栈,需要协助出栈
if ((h = head) != null && h.next == s)
casHead(h, s.next);
// 当前 node 模式为 REQUEST 类型,返回匹配节点的 m.item 数据域
// 当前 node 模式为 DATA 类型:返回 node.item 数据域,当前请求提交的数据 e
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 【CASE2】:逻辑到这说明请求模式不一致,如果栈顶不是 FULFILLING 说明没被其他节点匹配,【当前可以匹配】
} else if (!isFulfilling(h.mode)) {
// 头节点是取消节点,match 指向自己,协助出栈
if (h.isCancelled())
casHead(h, h.next);
// 入栈当前请求的节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
// m 是 s 的匹配的节点
SNode m = s.next;
// m 节点在 awaitFulfill 方法中被中断,clean 了自己
if (m == null) {
// 清空栈
casHead(s, null);
s = null;
// 返回到外层自旋中
break;
}
// 获取匹配节点的下一个节点
SNode mn = m.next;
// 尝试匹配,【匹配成功】,则将 fulfilling 和 m 一起出栈,并且唤醒被匹配的节点的线程
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 匹配失败,出栈 m
s.casNext(m, mn);
}
}
// 【CASE3】:栈顶模式为 FULFILLING 模式,表示【栈顶和栈顶下面的节点正在发生匹配】,当前请求需要做协助工作
} else {
// h 表示的是 fulfilling 节点,m 表示 fulfilling 匹配的节点
SNode m = h.next;
if (m == null)
// 清空栈
casHead(h, null);
else {
SNode mn = m.next;
// m 和 h 匹配,唤醒 m 中的线程
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}- awaitFulfill():阻塞当前线程等待被匹配,返回匹配的节点,或者被取消的节点
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 等待的截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 表示当前请求线程在下面的 for(;;) 自旋检查的次数
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋检查逻辑:是否匹配、是否超时、是否被中断
for (;;) {
// 当前线程收到中断信号,需要设置 node 状态为取消状态
if (w.isInterrupted())
s.tryCancel();
// 获取与当前 s 匹配的节点
SNode m = s.match;
if (m != null)
// 可能是正常的匹配的,也可能是取消的
return m;
// 执行了超时限制就判断是否超时
if (timed) {
nanos = deadline - System.nanoTime();
// 【超时了,取消节点】
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 说明当前线程还可以进行自旋检查
if (spins > 0)
// 自旋一次 递减 1
spins = shouldSpin(s) ? (spins - 1) : 0;
// 说明没有自旋次数了
else if (s.waiter == null)
//【把当前 node 对应的 Thread 保存到 node.waiter 字段中,要阻塞了】
s.waiter = w;
// 没有超时限制直接阻塞
else if (!timed)
LockSupport.park(this);
// nanos > 1000 纳秒的情况下,才允许挂起当前线程
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
boolean shouldSpin(SNode s) {
// 获取栈顶
SNode h = head;
// 条件一成立说明当前 s 就是栈顶,允许自旋检查
// 条件二成立说明当前 s 节点自旋检查期间,又来了一个与当前 s 节点匹配的请求,双双出栈后条件会成立
// 条件三成立前提当前 s 不是栈顶元素,并且当前栈顶正在匹配中,这种状态栈顶下面的元素,都允许自旋检查
return (h == s || h == null || isFulfilling(h.mode));
}- clear():指定节点出栈
void clean(SNode s) {
// 清空数据域和关联线程
s.item = null;
s.waiter = null;
// 获取取消节点的下一个节点
SNode past = s.next;
// 判断后继节点是不是取消节点,是就更新 past
if (past != null && past.isCancelled())
past = past.next;
SNode p;
// 从栈顶开始向下检查,【将栈顶开始向下的 取消状态 的节点全部清理出去】,直到碰到 past 或者不是取消状态为止
while ((p = head) != null && p != past && p.isCancelled())
// 修改的是内存地址对应的值,p 指向该内存地址所以数据一直在变化
casHead(p, p.next);
// 说明中间遇到了不是取消状态的节点,继续迭代下去
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}公平实现
TransferQueue 是公平的同步队列,采用 FIFO 的队列实现,请求节点与队尾模式不同,需要与队头发生匹配
TransferQueue 类成员变量:
指向队列的 dummy 节点:
transient volatile QNode head;指向队列的尾节点:
transient volatile QNode tail;被清理节点的前驱节点:
transient volatile QNode cleanMe;入队操作是两步完成的,第一步是 t.next = newNode,第二步是 tail = newNode,所以队尾节点出队,是一种非常特殊的情况
TransferQueue 内部类:
- QNode:
static final class QNode {
// 指向当前节点的下一个节点
volatile QNode next;
// 数据域,Node 代表的是 DATA 类型 item 表示数据,否则 Node 代表的 REQUEST 类型,item == null
volatile Object item;
// 假设当前 node 对应的线程自旋期间未被匹配成功,那么 node 对应的线程需要挂起,
// 挂起前 waiter 保存对应的线程引用,方便匹配成功后被唤醒。
volatile Thread waiter;
// true 当前 Node 是一个 DATA 类型,false 表示当前 Node 是一个 REQUEST 类型
final boolean isData;
// 构建方法
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 尝试取消当前 node,取消状态的 node 的 item 域指向自己
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 判断当前 node 是否为取消状态
boolean isCancelled() {
return item == this;
}
// 判断当前节点是否 “不在” 队列内,当 next 指向自己时,说明节点已经出队。
boolean isOffList() {
return next == this;
}
}TransferQueue 类成员方法:
- 设置头尾节点:
void advanceHead(QNode h, QNode nh) {
// 设置头指针指向新的节点,
if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
// 老的头节点出队
h.next = h;
}
void advanceTail(QNode t, QNode nt) {
if (tail == t)
// 更新队尾节点为新的队尾
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}- transfer():核心方法
E transfer(E e, boolean timed, long nanos) {
// s 指向当前请求对应的 node
QNode s = null;
// 是否是 DATA 类型的请求
boolean isData = (e != null);
// 自旋
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;
// head 和 tail 同时指向 dummy 节点,说明是空队列
// 队尾节点与当前请求类型是一致的情况,说明阻塞队列中都无法匹配,
if (h == t || t.isData == isData) {
// 获取队尾 t 的 next 节点
QNode tn = t.next;
// 多线程环境中其他线程可能修改尾节点
if (t != tail)
continue;
// 已经有线程入队了,更新 tail
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 允许超时,超时时间小于 0,这种方法不支持阻塞等待
if (timed && nanos <= 0)
return null;
// 创建 node 的逻辑
if (s == null)
s = new QNode(e, isData);
// 将 node 添加到队尾
if (!t.casNext(null, s))
continue;
// 更新队尾指针
advanceTail(t, s);
// 当前节点 等待匹配....
Object x = awaitFulfill(s, e, timed, nanos);
// 说明【当前 node 状态为 取消状态】,需要做出队逻辑
if (x == s) {
clean(t, s);
return null;
}
// 说明当前 node 仍然在队列内,匹配成功,需要做出队逻辑
if (!s.isOffList()) {
// t 是当前 s 节点的前驱节点,判断 t 是不是头节点,是就更新 dummy 节点为 s 节点
advanceHead(t, s);
// s 节点已经出队,所以需要把它的 item 域设置为它自己,表示它是个取消状态
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 队尾节点与当前请求节点【互补匹配】
} else {
// h.next 节点,【请求节点与队尾模式不同,需要与队头发生匹配】,TransferQueue 是一个【公平模式】
QNode m = h.next;
// 并发导致其他线程修改了队尾节点,或者已经把 head.next 匹配走了
if (t != tail || m == null || h != head)
continue;
// 获取匹配节点的数据域保存到 x
Object x = m.item;
// 判断是否匹配成功
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}
// 【匹配完成】,将头节点出队,让这个新的头结点成为 dummy 节点
advanceHead(h, m);
// 唤醒该匹配节点的线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}- awaitFulfill():阻塞当前线程等待被匹配
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 表示等待截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自选检查的次数
int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 被打断就取消节点
if (w.isInterrupted())
s.tryCancel(e);
// 获取当前 Node 数据域
Object x = s.item;
// 当前请求为 DATA 模式时:e 请求带来的数据
// s.item 修改为 this,说明当前 QNode 对应的线程 取消状态
// s.item 修改为 null 表示已经有匹配节点了,并且匹配节点拿走了 item 数据
// 当前请求为 REQUEST 模式时:e == null
// s.item 修改为 this,说明当前 QNode 对应的线程 取消状态
// s.item != null 且 item != this 表示当前 REQUEST 类型的 Node 已经匹配到 DATA 了
if (x != e)
return x;
// 超时检查
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋次数减一
if (spins > 0)
--spins;
// 没有自旋次数了,把当前线程封装进去 waiter
else if (s.waiter == null)
s.waiter = w;
// 阻塞
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}TODO HERE
操作Pool
创建方式
Executor
存放线程的容器:
private final HashSet<Worker> workers = new HashSet<Worker>();
构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)参数介绍:
corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数
keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于
corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等到keepAliveTime时间超过销毁unit:
keepAliveTime参数的时间单位workQueue:阻塞队列,存放被提交但尚未被执行的任务
threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略
RejectedExecutionHandler 下有 4 个实现类:
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务
补充:其他框架拒绝策略
Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
Netty:创建一个新线程来执行任务
ActiveMQ:带超时等待(60s)尝试放入队列
PinPoint:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
工作原理:

创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务
如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建非核心线程立刻运行这个任务,对于阻塞队列中的任务不公平。这是因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行
如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
当一个线程完成任务时,会从队列中取下一个任务来执行
当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小
Executors
Executors 提供了四种线程池的创建:newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool
- newFixedThreadPool:创建一个拥有 n 个线程的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 `Integer.MAX_VALUE`,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出)
- 适用于任务量已知,相对耗时的长期任务
- newCachedThreadPool:创建一个可扩容的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}- 核心线程数是 0, 最大线程数是 29 个 1,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 **OOM**
- SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
- 适合任务数比较密集,但每个任务执行时间较短的情况
- newSingleThreadExecutor:创建一个只有 1 个线程的单线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}- 保证所有任务按照**指定顺序执行**,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放
对比:
创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作
Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法
Executors.newFixedThreadPool(1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

开发要求
阿里巴巴 Java 开发手册要求:
线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题
如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者过度切换的问题
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回的线程池对象弊端如下:
FixedThreadPool 和 SingleThreadPool:请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
CacheThreadPool 和 ScheduledThreadPool:允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM
创建多大容量的线程池合适?
一般来说池中总线程数是核心池线程数量两倍,确保当核心池有线程停止时,核心池外有线程进入核心池
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
上下文切换:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态,任务从保存到再加载的过程就是一次上下文切换
核心线程数常用公式:
CPU 密集型任务 (N+1): 这种任务消耗的是 CPU 资源,可以将核心线程数设置为 N (CPU 核心数) + 1,比 CPU 核心数多出来的一个线程是为了防止线程发生缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 某个核心就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如在内存中对大量数据进行分析
I/O 密集型任务: 这种系统 CPU 处于阻塞状态,用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用,因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N 或 CPU 核数/ (1-阻塞系数),阻塞系数在 0.8~0.9 之间
IO 密集型就是涉及到网络读取,文件读取此类任务 ,特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上
提交方法
ExecutorService 类 API:
//执行任务(Executor 类 API)
void execute(Runnable command)
//提交任务 task()
Future< ?> submit(Runnable task)
//提交任务 task,用返回值 Future 获得任务执行结果
Future submit(Callable<T> task)
//提交 tasks 中所有任务
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
//提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常
List<Future<T>> invokeAll(Collection< ? extends Callable<T>> tasks, long timeout, TimeUnit unit)
//提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
T invokeAny(Collection< ? extends Callable<T>> tasks)execute 和 submit 都属于线程池的方法,对比:
execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行
execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出
关闭方法
ExecutorService 类 API:
//线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务)
void shutdown()
//线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回
List<Runnable> shutdownNow()
//不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true
boolean isShutdown()
//线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true
boolean isTerminated()
//调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit)处理异常
execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法
方法 1:主动捉异常
ExecutorService executorService = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
System.out.println("task1");
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});方法 2:使用 Future 对象
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = pool.submit(() -> {
System.out.println("task1");
int i = 1 / 0;
return true;
});
System.out.println(future.get());工作原理
状态信息
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值
状态表示:
// 高3位:表示当前线程池运行状态,除去高3位之后的低位:表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 表示在 ctl 中,低 COUNT_BITS 位,是用于存放当前线程数量的位private static final int COUNT_BITS = Integer.SIZE - 3;// 低 COUNT_BITS 位所能表达的最大数值,000 11111111111111111111 => 5亿多private static final int CAPACITY = (1 << COUNT_BITS) - 1;
四种状态:
// 111 000000000000000000,转换成整数后其实就是一个【负数】
private static final int RUNNING = -1 << COUNT_BITS;
// 000 000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;|状态|高3位|接收新任务|处理阻塞任务队列|说明| |---|---|---|---|---| |RUNNING|111|Y|Y|| |SHUTDOWN|000|N|Y|不接收新任务,但处理阻塞队列剩余任务| |STOP|001|N|N|中断正在执行的任务,并抛弃阻塞队列任务| |TIDYING|010|-|-|任务全执行完毕,活动线程为 0 即将进入终结| |TERMINATED|011|-|-|终止状态|
获取当前线程池运行状态:
// ~CAPACITY = ~000 11111111111111111111 = 111 000000000000000000000(取反)
// c == ctl = 111 000000000000000000111
// 111 000000000000000000111
// 111 000000000000000000000
// 111 000000000000000000000 获取到了运行状态private static int runStateOf(int c) { return c & ~CAPACITY; }获取当前线程池线程数量:
// c = 111 000000000000000000111
// CAPACITY = 000 111111111111111111111
// 000 000000000000000000111 => 7private static int workerCountOf(int c) { return c & CAPACITY; }重置当前线程池状态 ctl:
// rs 表示线程池状态,wc 表示当前线程池中 worker(线程)数量,相与以后就是合并后的状态
private static int ctlOf(int rs, int wc) { return rs | wc; }比较当前线程池 ctl 所表示的状态:
// 比较当前线程池 ctl 所表示的状态,是否小于某个状态 s
// 状态对比:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATEDprivate static boolean runStateLessThan(int c, int s) { return c < s; }// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态sprivate static boolean runStateAtLeast(int c, int s) { return c >= s; }// 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == 0private static boolean isRunning(int c) { return c < SHUTDOWN; }设置线程池 ctl:
// 使用 CAS 方式 让 ctl 值 +1 ,成功返回 true, 失败返回 false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 使用 CAS 方式 让 ctl 值 -1 ,成功返回 true, 失败返回 false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 将 ctl 值减一,do while 循环会一直重试,直到成功为止
private void decrementWorkerCount() {
do {} while (!compareAndDecrementWorkerCount(ctl.get()));
}成员属性
成员变量
线程池中存放 Worker 的容器:线程池没有初始化,直接往池中加线程即可
private final HashSet<Worker> workers = new HashSet<Worker>();线程全局锁:
// 增加减少 worker 或者时修改线程池运行状态需要持有 mainLock
private final ReentrantLock mainLock = new ReentrantLock();可重入锁的条件变量:
// 当外部线程调用 awaitTermination() 方法时,会等待当前线程池状态为 Termination 为止
private final Condition termination = mainLock.newCondition()线程池相关参数:
private volatile int corePoolSize; // 核心线程数量
private volatile int maximumPoolSize; // 线程池最大线程数量
private volatile long keepAliveTime; // 空闲线程存活时间
private volatile ThreadFactory threadFactory; // 创建线程时使用的线程工厂,默认是 DefaultThreadFactory
private final BlockingQueue<Runnable> workQueue;// 【超过核心线程提交任务就放入 阻塞队列】
private volatile RejectedExecutionHandler handler; // 拒绝策略,juc包提供了4中方式
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();// 默认策略- 记录线程池相关属性的数值:
private int largestPoolSize; // 记录线程池生命周期内线程数最大值
private long completedTaskCount; // 记录线程池所完成任务总数,当某个 worker 退出时将完成的任务累加到该属性控制核心线程数量内的线程是否可以被回收:
// false(默认)代表不可以,为 true 时核心线程空闲超过 keepAliveTime 也会被回收
// allowCoreThreadTimeOut(boolean value) 方法可以设置该值private volatile boolean allowCoreThreadTimeOut;
内部类:
- Worker 类:每个 Worker 对象会绑定一个初始任务,启动 Worker 时优先执行,这也是造成线程池不公平的原因。Worker 继承自 AQS,本身具有锁的特性,采用独占锁模式,state = 0 表示未被占用,> 0 表示被占用,< 0 表示初始状态不能被抢锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // worker 内部封装的工作线程
Runnable firstTask; // worker 第一个执行的任务,普通的 Runnable 实现类或者是 FutureTask
volatile long completedTasks; // 记录当前 worker 所完成任务数量
// 构造方法
Worker(Runnable firstTask) {
// 设置AQS独占模式为初始化中状态,这个状态不能被抢占锁
setState(-1);
// firstTask不为空时,当worker启动后,内部线程会优先执行firstTask,执行完后会到queue中去获取下个任务
this.firstTask = firstTask;
// 使用线程工厂创建一个线程,并且【将当前worker指定为Runnable】,所以thread启动时会调用 worker.run()
this.thread = getThreadFactory().newThread(this);
}
// 【不可重入锁】
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
public Thread newThread(Runnable r) {
// 将当前 worker 指定为 thread 的执行方法,线程调用 start 会调用 r.run()
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}- 拒绝策略相关的内部类
成员方法
提交方法
- AbstractExecutorService#submit():提交任务,把 Runnable 或 Callable 任务封装成 FutureTask 执行,可以通过方法返回的任务对象,调用 get 阻塞获取任务执行的结果或者异常,源码分析在笔记的 Future 部分
public Future<?> submit(Runnable task) {
// 空指针异常
if (task == null) throw new NullPointerException();
// 把 Runnable 封装成未来任务对象,执行结果就是 null,也可以通过参数指定 FutureTask#get 返回数据
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 执行方法
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 把 Callable 封装成未来任务对象
RunnableFuture<T> ftask = newTaskFor(task);
// 执行方法
execute(ftask);
// 返回未来任务对象,用来获取返回值
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// Runnable 封装成 FutureTask,【指定返回值】
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// Callable 直接封装成 FutureTask
return new FutureTask<T>(callable);
}- execute():执行任务,但是没有返回值,没办法获取任务执行结果,出现异常会直接抛出任务执行时的异常。根据线程池中的线程数,选择添加任务时的处理方式
// command 可以是普通的 Runnable 实现类,也可以是 FutureTask,不能是 Callable
public void execute(Runnable command) {
// 非空判断
if (command == null)
throw new NullPointerException();
// 获取 ctl 最新值赋值给 c,ctl 高 3 位表示线程池状态,低位表示当前线程池线程数量。
int c = ctl.get();
// 【1】当前线程数量小于核心线程数,此次提交任务直接创建一个新的 worker,线程池中多了一个新的线程
if (workerCountOf(c) < corePoolSize) {
// addWorker 为创建线程的过程,会创建 worker 对象并且将 command 作为 firstTask,优先执行
if (addWorker(command, true))
return;
// 执行到这条语句,说明 addWorker 一定是失败的,存在并发现象或者线程池状态被改变,重新获取状态
// SHUTDOWN 状态下也有可能创建成功,前提 firstTask == null 而且当前 queue 不为空(特殊情况)
c = ctl.get();
}
// 【2】执行到这说明当前线程数量已经达到核心线程数量 或者 addWorker 失败
// 判断当前线程池是否处于running状态,成立就尝试将 task 放入到 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 条件一成立说明线程池状态被外部线程给修改了,可能是执行了 shutdown() 方法,该状态不能接收新提交的任务
// 所以要把刚提交的任务删除,删除成功说明提交之后线程池中的线程还未消费(处理)该任务
if (!isRunning(recheck) && remove(command))
// 任务出队成功,走拒绝策略
reject(command);
// 执行到这说明线程池是 running 状态,获取线程池中的线程数量,判断是否是 0
// 【担保机制】,保证线程池在 running 状态下,最起码得有一个线程在工作
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 【3】offer失败说明queue满了
// 如果线程数量尚未达到 maximumPoolSize,会创建非核心 worker 线程直接执行 command,【这也是不公平的原因】
// 如果当前线程数量达到 maximumPoolSiz,这里 addWorker 也会失败,走拒绝策略
else if (!addWorker(command, false))
reject(command);
}添加线程
- prestartAllCoreThreads():提前预热,创建所有的核心线程
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}addWorker():添加线程到线程池,返回 true 表示创建 Worker 成功,且线程启动。首先判断线程池是否允许添加线程,允许就让线程数量 + 1,然后去创建 Worker 加入线程池
注意:SHUTDOWN 状态也能添加线程,但是要求新加的 Woker 没有 firstTask,而且当前 queue 不为空,所以创建一个线程来帮助线程池执行队列中的任务
// core == true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋【判断当前线程池状态是否允许创建线程】,允许就设置线程数量 + 1
retry:
for (;;) {
// 获取 ctl 的值
int c = ctl.get();
// 获取当前线程池运行状态
int rs = runStateOf(c);
// 判断当前线程池状态【是否允许添加线程】
// 当前线程池是 SHUTDOWN 状态,但是队列里面还有任务尚未处理完,需要处理完 queue 中的任务
// 【不允许再提交新的 task,所以 firstTask 为空,但是可以继续添加 worker】
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池中线程数量
int wc = workerCountOf(c);
// 条件一一般不成立,CAPACITY是5亿多,根据 core 判断使用哪个大小限制线程数量,超过了返回 false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 记录线程数量已经加 1,类比于申请到了一块令牌,条件失败说明其他线程修改了数量
if (compareAndIncrementWorkerCount(c))
// 申请成功,跳出了 retry 这个 for 自旋
break retry;
// CAS 失败,没有成功的申请到令牌
c = ctl.get();
// 判断当前线程池状态是否发生过变化,被其他线程修改了,可能其他线程调用了 shutdown() 方法
if (runStateOf(c) != rs)
// 返回外层循环检查是否能创建线程,在 if 语句中返回 false
continue retry;
}
}
//【令牌申请成功,开始创建线程】
// 运行标记,表示创建的 worker 是否已经启动,false未启动 true启动
boolean workerStarted = false;
// 添加标记,表示创建的 worker 是否添加到池子中了,默认false未添加,true是添加。
boolean workerAdded = false;
Worker w = null;
try {
// 【创建 Worker,底层通过线程工厂 newThread 方法创建执行线程,指定了首先执行的任务】
w = new Worker(firstTask);
// 将新创建的 worker 节点中的线程赋值给 t
final Thread t = w.thread;
// 这里的判断为了防止 程序员自定义的 ThreadFactory 实现类有 bug,创造不出线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加互斥锁,要添加 worker 了
mainLock.lock();
try {
// 获取最新线程池运行状态保存到 rs
int rs = runStateOf(ctl.get());
// 判断线程池是否为RUNNING状态,不是再【判断当前是否为SHUTDOWN状态且firstTask为空,特殊情况】
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 当线程start后,线程isAlive会返回true,这里还没开始启动线程,如果被启动了就需要报错
if (t.isAlive())
throw new IllegalThreadStateException();
//【将新建的 Worker 添加到线程池中】
workers.add(w);
int s = workers.size();
// 当前池中的线程数量是一个新高,更新 largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
// 添加标记置为 true
workerAdded = true;
}
} finally {
// 解锁啊
mainLock.unlock();
}
// 添加成功就【启动线程执行任务】
if (workerAdded) {
// Thread 类中持有 Runnable 任务对象,调用的是 Runnable 的 run ,也就是 FutureTask
t.start();
// 运行标记置为 true
workerStarted = true;
}
}
} finally {
// 如果启动线程失败,做清理工作
if (! workerStarted)
addWorkerFailed(w);
}
// 返回新创建的线程是否启动
return workerStarted;
}- addWorkerFailed():清理任务
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 持有线程池全局锁,因为操作的是线程池相关的东西
mainLock.lock();
try {
//条件成立需要将 worker 在 workers 中清理出去。
if (w != null)
workers.remove(w);
// 将线程池计数 -1,相当于归还令牌。
decrementWorkerCount();
// 尝试停止线程池
tryTerminate();
} finally {
//释放线程池全局锁。
mainLock.unlock();
}
}运行方法
- Worker#run:Worker 实现了 Runnable 接口,当线程启动时,会调用 Worker 的 run() 方法
public void run() {
// ThreadPoolExecutor#runWorker()
runWorker(this);
}- runWorker():线程启动就要执行任务,会一直 while 循环获取任务并执行
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取 worker 的 firstTask
Runnable task = w.firstTask;
// 引用置空,【防止复用该线程时重复执行该任务】
w.firstTask = null;
// 初始化 worker 时设置 state = -1,表示不允许抢占锁
// 这里需要设置 state = 0 和 exclusiveOwnerThread = null,开始独占模式抢锁
w.unlock();
// true 表示发生异常退出,false 表示正常退出。
boolean completedAbruptly = true;
try {
// firstTask 不是 null 就直接运行,否则去 queue 中获取任务
// 【getTask 如果是阻塞获取任务,会一直阻塞在take方法,直到获取任务,不会走返回null的逻辑】
while (task != null || (task = getTask()) != null) {
// worker 加锁,shutdown 时会判断当前 worker 状态,【根据独占锁状态判断是否空闲】
w.lock();
// 说明线程池状态大于 STOP,目前处于 STOP/TIDYING/TERMINATION,此时给线程一个中断信号
if ((runStateAtLeast(ctl.get(), STOP) ||
// 说明线程处于 RUNNING 或者 SHUTDOWN 状态,清除打断标记
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// 中断线程,设置线程的中断标志位为 true
wt.interrupt();
try {
// 钩子方法,【任务执行的前置处理】
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 【执行任务】
task.run();
} catch (Exception x) {
//.....
} finally {
// 钩子方法,【任务执行的后置处理】
afterExecute(task, thrown);
}
} finally {
task = null; // 将局部变量task置为null,代表任务执行完成
w.completedTasks++; // 更新worker完成任务数量
w.unlock(); // 解锁
}
}
// getTask()方法返回null时会走到这里,表示queue为空并且线程空闲超过保活时间,【当前线程执行退出逻辑】
completedAbruptly = false;
} finally {
// 正常退出 completedAbruptly = false
// 异常退出 completedAbruptly = true,【从 task.run() 内部抛出异常】时,跳到这一行
processWorkerExit(w, completedAbruptly);
}
}- unlock():重置锁
public void unlock() { release(1); }
// 外部不会直接调用这个方法 这个方法是 AQS 内调用的,外部调用 unlock 时触发此方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null); // 设置持有者为 null
setState(0); // 设置 state = 0
return true;
}- getTask():获取任务,线程空闲时间超过 keepAliveTime 就会被回收,判断的依据是当前线程阻塞获取任务超过保活时间,方法返回 null 就代表当前线程要被回收了,返回到 runWorker 执行线程退出逻辑。线程池具有担保机制,对于 RUNNING 状态下的超时回收,要保证线程池中最少有一个线程运行,或者任务阻塞队列已经是空
private Runnable getTask() {
// 超时标记,表示当前线程获取任务是否超时,true 表示已超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
// 获取线程池当前运行状态
int rs = runStateOf(c);
// 【tryTerminate】打断线程后执行到这,此时线程池状态为STOP或者线程池状态为SHUTDOWN并且队列已经是空
// 所以下面的 if 条件一定是成立的,可以直接返回 null,线程就应该退出了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 使用 CAS 自旋的方式让 ctl 值 -1
decrementWorkerCount();
return null;
}
// 获取线程池中的线程数量
int wc = workerCountOf(c);
// 线程没有明确的区分谁是核心或者非核心线程,是根据当前池中的线程数量判断
// timed = false 表示当前这个线程 获取task时不支持超时机制的,当前线程会使用 queue.take() 阻塞获取
// timed = true 表示当前这个线程 获取task时支持超时机制,使用 queue.poll(xxx,xxx) 超时获取
// 条件一代表允许回收核心线程,那就无所谓了,全部线程都执行超时回收
// 条件二成立说明线程数量大于核心线程数,当前线程认为是非核心线程,有保活时间,去超时获取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果线程数量是否超过最大线程数,直接回收
// 如果当前线程【允许超时回收并且已经超时了】,就应该被回收了,由于【担保机制】还要做判断:
// wc > 1 说明线程池还用其他线程,当前线程可以直接回收
// workQueue.isEmpty() 前置条件是 wc = 1,【如果当前任务队列也是空了,最后一个线程就可以退出】
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
// 使用 CAS 机制将 ctl 值 -1 ,减 1 成功的线程,返回 null,代表可以退出
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据当前线程是否需要超时回收,【选择从队列获取任务的方法】是超时获取或者阻塞获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
// 获取到任务返回任务,【阻塞获取会阻塞到获取任务为止】,不会返回 null
if (r != null)
return r;
// 获取任务为 null 说明超时了,将超时标记设置为 true,下次自旋时返 null
timedOut = true;
} catch (InterruptedException retry) {
// 阻塞线程被打断后超时标记置为 false,【说明被打断不算超时】,要继续获取,直到超时或者获取到任务
// 如果线程池 SHUTDOWN 状态下的打断,会在循环获取任务前判断,返回 null
timedOut = false;
}
}
}- processWorkerExit():线程退出线程池,也有担保机制,保证队列中的任务被执行
// 正常退出 completedAbruptly = false,异常退出为 true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 条件成立代表当前 worker 是发生异常退出的,task 任务执行过程中向上抛出异常了
if (completedAbruptly)
// 从异常时到这里 ctl 一直没有 -1,需要在这里 -1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 将当前 worker 完成的 task 数量,汇总到线程池的 completedTaskCount
completedTaskCount += w.completedTasks;
// 将 worker 从线程池中移除
workers.remove(w);
} finally {
mainLock.unlock(); // 解锁
}
// 尝试停止线程池,唤醒下一个线程
tryTerminate();
int c = ctl.get();
// 线程池不是停止状态就应该有线程运行【担保机制】
if (runStateLessThan(c, STOP)) {
// 正常退出的逻辑,是对空闲线程回收,不是执行出错
if (!completedAbruptly) {
// 根据是否回收核心线程确定【线程池中的线程数量最小值】
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 最小值为 0,但是线程队列不为空,需要一个线程来完成任务担保机制
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池中的线程数量大于最小值可以直接返回
if (workerCountOf(c) >= min)
return;
}
// 执行 task 时发生异常,有个线程因为异常终止了,需要添加
// 或者线程池中的数量小于最小值,这里要创建一个新 worker 加进线程池
addWorker(null, false);
}
}停止方法
- shutdown():停止线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取线程池全局锁
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为 SHUTDOWN,如果线程池状态大于 SHUTDOWN,就不会设置直接返回
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 空方法,子类可以扩展
onShutdown();
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
tryTerminate();
}- interruptIdleWorkers():shutdown 方法会中断所有空闲线程,根据是否可以获取 AQS 独占锁判断是否处于工作状态。线程之所以空闲是因为阻塞队列没有任务,不会中断正在运行的线程,所以 shutdown 方法会让所有的任务执行完毕
// onlyOne == true 说明只中断一个线程 ,false 则中断所有线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
/ /持有全局锁
mainLock.lock();
try {
// 遍历所有 worker
for (Worker w : workers) {
// 获取当前 worker 的线程
Thread t = w.thread;
// 条件一成立:说明当前迭代的这个线程尚未中断
// 条件二成立:说明【当前worker处于空闲状态】,阻塞在poll或者take,因为worker执行task时是要加锁的
// 每个worker有一个独占锁,w.tryLock()尝试加锁,加锁成功返回 true
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断线程,处于 queue 阻塞的线程会被唤醒,进入下一次自旋,返回 null,执行退出相逻辑
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 释放worker的独占锁
w.unlock();
}
}
// false,代表中断所有的线程
if (onlyOne)
break;
}
} finally {
// 释放全局锁
mainLock.unlock();
}
}- shutdownNow():直接关闭线程池,不会等待任务执行完成
public List<Runnable> shutdownNow() {
// 返回值引用
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 获取线程池全局锁
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 中断线程池中【所有线程】
interruptWorkers();
// 从阻塞队列中导出未处理的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 返回当前任务队列中 未处理的任务。
return tasks;
}- tryTerminate():设置为 TERMINATED 状态 if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)
final void tryTerminate() {
for (;;) {
// 获取 ctl 的值
int c = ctl.get();
// 线程池正常,或者有其他线程执行了状态转换的方法,当前线程直接返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
// 线程池是 SHUTDOWN 并且任务队列不是空,需要去处理队列中的任务
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 执行到这里说明线程池状态为 STOP 或者线程池状态为 SHUTDOWN 并且队列已经是空
// 判断线程池中线程的数量
if (workerCountOf(c) != 0) {
// 【中断一个空闲线程】,在 queue.take() | queue.poll() 阻塞空闲
// 唤醒后的线程会在getTask()方法返回null,
// 执行 processWorkerExit 退出逻辑时会再次调用 tryTerminate() 唤醒下一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
// 池中的线程数量为 0 来到这里
final ReentrantLock mainLock = this.mainLock;
// 加全局锁
mainLock.lock();
try {
// 设置线程池状态为 TIDYING 状态,线程数量为 0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 结束线程池
terminated();
} finally {
// 设置线程池状态为TERMINATED状态。
ctl.set(ctlOf(TERMINATED, 0));
// 【唤醒所有调用 awaitTermination() 方法的线程】
termination.signalAll();
}
return;
}
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
}
}Future
线程使用
FutureTask 未来任务对象,继承 Runnable、Future 接口,用于包装 Callable 对象,实现任务的提交
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello World";
}
});
new Thread(task).start(); //启动线程
String msg = task.get(); //获取返回任务数据
System.out.println(msg);
}构造方法:
public FutureTask(Callable<V> callable){
this.callable = callable; // 属性注入
this.state = NEW; // 任务状态设置为 new
}
public FutureTask(Runnable runnable, V result) {
// 适配器模式
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 使用装饰者模式将 runnable 转换成 callable 接口,外部线程通过 get 获取
// 当前任务执行结果时,结果可能为 null 也可能为传进来的值,【传进来什么返回什么】
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
// 构造方法
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
// 实则调用 Runnable#run 方法
task.run();
// 返回值为构造 FutureTask 对象时传入的返回值或者是 null
return result;
}
}成员属性
FutureTask 类的成员属性:
- 任务状态:
// 表示当前task状态
private volatile int state;
// 当前任务尚未执行
private static final int NEW = 0;
// 当前任务正在结束,尚未完全结束,一种临界状态
private static final int COMPLETING = 1;
// 当前任务正常结束
private static final int NORMAL = 2;
// 当前任务执行过程中发生了异常,内部封装的 callable.run() 向上抛出异常了
private static final int EXCEPTIONAL = 3;
// 当前任务被取消
private static final int CANCELLED = 4;
// 当前任务中断中
private static final int INTERRUPTING = 5;
// 当前任务已中断
private static final int INTERRUPTED = 6;任务对象:
private Callable<V> callable; // Runnable 使用装饰者模式伪装成 Callable存储任务执行的结果,这是 run 方法返回值是 void 也可以获取到执行结果的原因:
// 正常情况下:任务正常执行结束,outcome 保存执行结果,callable 返回值
// 非正常情况:callable 向上抛出异常,outcome 保存异常private Object outcome;执行当前任务的线程对象:
private volatile Thread runner; // 当前任务被线程执行期间,保存当前执行任务的线程对象引用线程阻塞队列的头节点:
// 会有很多线程去 get 当前任务的结果,这里使用了一种数据结构头插头取(类似栈)的一个队列来保存所有的 get 线程
private volatile WaitNode waiters;内部类:
static final class WaitNode {
// 单向链表
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}成员方法
FutureTask 类的成员方法:
- FutureTask#run:任务执行入口
public void run() {
//条件一:成立说明当前 task 已经被执行过了或者被 cancel 了,非 NEW 状态的任务,线程就不需要处理了
//条件二:线程是 NEW 状态,尝试设置当前任务对象的线程是当前线程,设置失败说明其他线程抢占了该任务,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
// 执行到这里,当前 task 一定是 NEW 状态,而且【当前线程也抢占 task 成功】
Callable<V> c = callable;
// 判断任务是否为空,防止空指针异常;判断 state 状态,防止外部线程在此期间 cancel 掉当前任务
// 【因为 task 的执行者已经设置为当前线程,所以这里是线程安全的】
if (c != null && state == NEW) {
V result;
// true 表示 callable.run 代码块执行成功 未抛出异常
// false 表示 callable.run 代码块执行失败 抛出异常
boolean ran;
try {
// 【调用自定义的方法,执行结果赋值给 result】
result = c.call();
// 没有出现异常
ran = true;
} catch (Throwable ex) {
// 出现异常,返回值置空,ran 置为 false
result = null;
ran = false;
// 设置返回的异常
setException(ex);
}
// 代码块执行正常
if (ran)
// 设置返回的结果
set(result);
}
} finally {
// 任务执行完成,取消线程的引用,help GC
runner = null;
int s = state;
// 判断任务是不是被中断
if (s >= INTERRUPTING)
// 执行中断处理方法
handlePossibleCancellationInterrupt(s);
}
}FutureTask#set:设置正常返回值,首先将任务状态设置为 COMPLETING 状态代表完成中,逻辑执行完设置为 NORMAL 状态代表任务正常执行完成,最后唤醒 get() 阻塞线程
protected void set(V v) {
// CAS 方式设置当前任务状态为完成中,设置失败说明其他线程取消了该任务
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 【将结果赋值给 outcome】
outcome = v;
// 将当前任务状态修改为 NORMAL 正常结束状态。
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}FutureTask#setException:设置异常返回值
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 赋值给返回结果,用来向上层抛出来的异常
outcome = t;
// 将当前任务的状态 修改为 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}FutureTask#finishCompletion:**唤醒 get() 阻塞线程**
private void finishCompletion() {
// 遍历所有的等待的节点,q 指向头节点
for (WaitNode q; (q = waiters) != null;) {
// 使用cas设置 waiters 为 null,防止外部线程使用cancel取消当前任务,触发finishCompletion方法重复执行
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 自旋
for (;;) {
// 获取当前 WaitNode 节点封装的 thread
Thread t = q.thread;
// 当前线程不为 null,唤醒当前 get() 等待获取数据的线程
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 获取当前节点的下一个节点
WaitNode next = q.next;
// 当前节点是最后一个节点了
if (next == null)
break;
// 断开链表
q.next = null; // help gc
q = next;
}
break;
}
}
done();
callable = null; // help GC
}FutureTask#handlePossibleCancellationInterrupt:任务中断处理
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
// 中断状态中
while (state == INTERRUPTING)
// 等待中断完成
Thread.yield();
}- FutureTask#get:获取任务执行的返回值,执行 run 和 get 的不是同一个线程,一般有多个线程 get,只有一个线程 run
public V get() throws InterruptedException, ExecutionException {
// 获取当前任务状态
int s = state;
// 条件成立说明任务还没执行完成
if (s <= COMPLETING)
// 返回 task 当前状态,可能当前线程在里面已经睡了一会
s = awaitDone(false, 0L);
return report(s);
}FutureTask#awaitDone:**get 线程封装成 WaitNode 对象进入阻塞队列阻塞等待**
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 0 不带超时
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 引用当前线程,封装成 WaitNode 对象
WaitNode q = null;
// 表示当前线程 waitNode 对象,是否进入阻塞队列
boolean queued = false;
// 【三次自旋开始休眠】
for (;;) {
// 判断当前 get() 线程是否被打断,打断返回 true,清除打断标记
if (Thread.interrupted()) {
// 当前线程对应的等待 node 出队,
removeWaiter(q);
throw new InterruptedException();
}
// 获取任务状态
int s = state;
// 条件成立说明当前任务执行完成已经有结果了
if (s > COMPLETING) {
// 条件成立说明已经为当前线程创建了 WaitNode,置空 help GC
if (q != null)
q.thread = null;
// 返回当前的状态
return s;
}
// 条件成立说明当前任务接近完成状态,这里让当前线程释放一下 cpu ,等待进行下一次抢占 cpu
else if (s == COMPLETING)
Thread.yield();
// 【第一次自旋】,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象
else if (q == null)
q = new WaitNode();
// 【第二次自旋】,当前线程已经创建 WaitNode 对象了,但是node对象还未入队
else if (!queued)
// waiters 指向队首,让当前 WaitNode 成为新的队首,【头插法】,失败说明其他线程修改了新的队首
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
// 【第三次自旋】,会到这里,或者 else 内
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞指定的时间
LockSupport.parkNanos(this, nanos);
}
// 条件成立:说明需要阻塞
else
// 【当前 get 操作的线程被 park 阻塞】,除非有其它线程将唤醒或者将当前线程中断
LockSupport.park(this);
}
}FutureTask#report:封装运行结果,可以获取 run() 方法中设置的成员变量 outcome,**这是 run 方法的返回值是 void 也可以获取到任务执行的结果的原因**
private V report(int s) throws ExecutionException {
// 获取执行结果,是在一个 futuretask 对象中的属性,可以直接获取
Object x = outcome;
// 当前任务状态正常结束
if (s == NORMAL)
return (V)x; // 直接返回 callable 的逻辑结果
// 当前任务被取消或者中断
if (s >= CANCELLED)
throw new CancellationException(); // 抛出异常
// 执行到这里说明自定义的 callable 中的方法有异常,使用 outcome 上层抛出异常
throw new ExecutionException((Throwable)x);
}- FutureTask#cancel:任务取消,打断正在执行该任务的线程
public boolean cancel(boolean mayInterruptIfRunning) {
// 条件一:表示当前任务处于运行中或者处于线程池任务队列中
// 条件二:表示修改状态,成功可以去执行下面逻辑,否则返回 false 表示 cancel 失败
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果任务已经被执行,是否允许打断
if (mayInterruptIfRunning) {
try {
// 获取执行当前 FutureTask 的线程
Thread t = runner;
if (t != null)
// 打断执行的线程
t.interrupt();
} finally {
// 设置任务状态为【中断完成】
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有 get() 阻塞的线程
finishCompletion();
}
return true;
}任务调度
Timer
Timer 实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
private static void method1() {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task 1");
//int i = 1 / 0;//任务一的出错会导致任务二无法执行
Thread.sleep(2000);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此任务1的延时,影响了任务2的执行
timer.schedule(task1, 1000);//17:45:56 c.ThreadPool [Timer-0] - task 1
timer.schedule(task2, 1000);//17:45:58 c.ThreadPool [Timer-0] - task 2
}Scheduled
任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:
使用内部类 ScheduledFutureTask 封装任务
使用内部类 DelayedWorkQueue 作为线程池队列
重写 onShutdown 方法去处理 shutdown 后的任务
提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展
构造方法:Executors.newScheduledThreadPool(int corePoolSize)
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 阻塞队列是 DelayedWorkQueue
new DelayedWorkQueue());
}常用 API:
ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u):延迟执行任务ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit):定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit):定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位
基本使用:
- 延迟任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行
public static void main(String[] args){
// 线程池大小为1时也是串行执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,都在 1s 后同时执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
//int i = 1 / 0;
try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
}- 定时任务 scheduleAtFixedRate:一次任务的启动到下一次任务的启动之间只要大于等于间隔时间,抢占到 CPU 就会立即执行
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
System.out.println("start..." + new Date());
pool.scheduleAtFixedRate(() -> {
System.out.println("running..." + new Date());
Thread.sleep(2000);
}, 1, 1, TimeUnit.SECONDS);
}
/*start...Sat Apr 24 18:08:12 CST 2021
running...Sat Apr 24 18:08:13 CST 2021
running...Sat Apr 24 18:08:15 CST 2021
running...Sat Apr 24 18:08:17 CST 2021*/- 定时任务 scheduleWithFixedDelay:一次任务的结束到下一次任务的启动之间等于间隔时间,抢占到 CPU 就会立即执行,这个方法才是真正的设置两个任务之间的间隔
public static void main(String[] args){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
System.out.println("start..." + new Date());
pool.scheduleWithFixedDelay(() -> {
System.out.println("running..." + new Date());
Thread.sleep(2000);
}, 1, 1, TimeUnit.SECONDS);
}
/*start...Sat Apr 24 18:11:41 CST 2021
running...Sat Apr 24 18:11:42 CST 2021
running...Sat Apr 24 18:11:45 CST 2021
running...Sat Apr 24 18:11:48 CST 2021*/成员属性
成员变量
shutdown 后是否继续执行周期任务:
private volatile boolean continueExistingPeriodicTasksAfterShutdown;shutdown 后是否继续执行延迟任务:
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;取消方法是否将该任务从队列中移除:
// 默认 false,不移除,等到线程拿到任务之后抛弃
private volatile boolean removeOnCancel = false;任务的序列号,可以用来比较优先级:
private static final AtomicLong sequencer = new AtomicLong();
延迟任务
ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,具有延迟执行的特点,覆盖 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列
在调度线程池中无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask
成员变量:
任务序列号:
private final long sequenceNumber;执行时间:
private long time; // 任务可以被执行的时间,交付时间,以纳秒表示
private final long period; // 0 表示非周期任务,正数表示 fixed-rate 模式的周期,负数表示 fixed-delay 模式
fixed-rate:两次开始启动的间隔,fixed-delay:一次执行结束到下一次开始启动实际的任务对象:
RunnableScheduledFuture<V> outerTask = this;任务在队列数组中的索引下标:
// DelayedWorkQueue 底层使用的数据结构是最小堆,记录当前任务在堆中的索引,-1 代表删除
int heapIndex;
成员方法:
- 构造方法:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
// 任务的触发时间
this.time = ns;
// 任务的周期,多长时间执行一次
this.period = period;
// 任务的序号
this.sequenceNumber = sequencer.getAndIncrement();
}- compareTo():ScheduledFutureTask 根据执行时间 time 正序排列,如果执行时间相同,在按照序列号 sequenceNumber 正序排列,任务需要放入 DelayedWorkQueue,延迟队列中使用该方法按照从小到大进行排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
// 类型强转
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
// 比较者 - 被比较者的执行时间
long diff = time - x.time;
// 比较者先执行
if (diff < 0)
return -1;
// 被比较者先执行
else if (diff > 0)
return 1;
// 比较者的序列号小
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 不是 ScheduledFutureTask 类型时,根据延迟时间排序
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}- run():执行任务,非周期任务直接完成直接结束,周期任务执行完后会设置下一次的执行时间,重新放入线程池的阻塞队列,如果线程池中的线程数量少于核心线程,就会添加 Worker 开启新线程
public void run() {
// 是否周期性,就是判断 period 是否为 0
boolean periodic = isPeriodic();
// 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任务,直接调用 FutureTask#run 执行
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任务的执行,返回 true 表示执行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置周期任务的下一次执行时间
setNextRunTime();
// 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
reExecutePeriodic(outerTask);
}
}周期任务正常完成后**任务的状态不会变化**,依旧是 NEW,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中,方法返回 false,后续的该任务将不会再周期的执行
protected boolean runAndReset() {
// 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 执行方法,没有返回值
c.call();
ran = true;
} catch (Throwable ex) {
// 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
setException(ex);
}
}
} finally {
// 执行完成把执行线程引用置为 null
runner = null;
s = state;
// 如果线程被中断进行中断处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 如果正常执行,返回 true,并且任务状态没有被取消
return ran && s == NEW;
}
// 任务下一次的触发时间
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
time += p;
else
// fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
time = triggerTime(-p);
}- reExecutePeriodic():准备任务的下一次执行,重新放入阻塞任务队列
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 【放入任务队列】
super.getQueue().add(task);
// 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
// 如果不能执行且任务还在队列中未被取走,则取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
ensurePrestart();
}
}- cancel():取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类 FutureTask#cancel 来取消任务
boolean cancelled = super.cancel(mayInterruptIfRunning);
// removeOnCancel 用于控制任务取消后是否应该从阻塞队列中移除
if (cancelled && removeOnCancel && heapIndex >= 0)
// 从等待队列中删除该任务,并调用 tryTerminate() 判断是否需要停止线程池
remove(this);
return cancelled;
}延迟队列
DelayedWorkQueue 是支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素
其他阻塞队列存储节点的数据结构大都是链表,延迟队列是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常
成员变量:
- 容量:
private static final int INITIAL_CAPACITY = 16; // 初始容量
private int size = 0; // 节点数量
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 存放节点- 锁:
private final ReentrantLock lock = new ReentrantLock(); // 控制并发
private final Condition available = lock.newCondition();// 条件队列阻塞等待头节点的线程:线程池内的某个线程去 take() 获取任务时,如果延迟队列顶层节点不为 null(队列内有任务),但是节点任务还不到触发时间,线程就去检查队列的 leader字段是否被占用
如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间
堆顶任务.time - now()挂起如果被占用,当前线程直接到 available 条件队列不指定超时时间的挂起
// leader 在 available 条件队列内是首元素,它超时之后会醒过来,然后再次将堆顶元素获取走,获取走之后,take()结束之前,会调用是 available.signal() 唤醒下一个条件队列内的等待者,然后释放 lock,下一个等待者被唤醒后去到 AQS 队列,做 acquireQueue(node) 逻辑
private Thread leader = null;
成员方法
- offer():插入节点
public boolean offer(Runnable x) {
// 判空
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 队列锁,增加删除数据时都要加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 队列数量大于存放节点的数组长度,需要扩容
if (i >= queue.length)
// 扩容为原来长度的 1.5 倍
grow();
size = i + 1;
// 当前是第一个要插入的节点
if (i == 0) {
queue[0] = e;
// 修改 ScheduledFutureTask 的 heapIndex 属性,表示该对象在队列里的下标
setIndex(e, 0);
} else {
// 向上调整元素的位置,并更新 heapIndex
siftUp(i, e);
}
// 情况1:当前任务是第一个加入到 queue 内的任务,所以在当前任务加入到 queue 之前,take() 线程会直接
// 到 available 队列不设置超时的挂起,并不会去占用 leader 字段,这时需会唤醒一个线程 让它去消费
// 情况2:当前任务【优先级最高】,原堆顶任务可能还未到触发时间,leader 线程设置超时的在 available 挂起
// 原先的 leader 等待的是原先的头节点,所以 leader 已经无效,需要将 leader 线程唤醒,
// 唤醒之后它会检查堆顶,如果堆顶任务可以被消费,则直接获取走,否则继续成为 leader 等待新堆顶任务
if (queue[0] == e) {
// 将 leader 设置为 null
leader = null;
// 直接随便唤醒等待头结点的阻塞线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}// 插入新节点后对堆进行调整,进行节点上移,保持其特性【节点的值小于子节点的值】,小顶堆
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 父节点,就是堆排序
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// key 和父节点比,如果大于父节点可以直接返回,否则就继续上浮
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}- poll():非阻塞获取头结点,获取执行时间最近并且可以执行的
// 非阻塞获取
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取队头节点,因为是小顶堆
RunnableScheduledFuture<?> first = queue[0];
// 头结点为空或者的延迟时间没到返回 null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 头结点达到延迟时间,【尾节点成为替代节点下移调整堆结构】,返回头结点
return finishPoll(first);
} finally {
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 获取尾索引
int s = --size;
// 获取尾节点
RunnableScheduledFuture<?> x = queue[s];
// 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
queue[s] = null;
// s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
if (s != 0)
// 从索引处 0 开始向下调整
siftDown(0, x);
// 出队的元素索引设置为 -1
setIndex(f, -1);
return f;
}- take():阻塞获取头节点,读取当前堆中最小的也就是触发时间最近的任务
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 保证线程安全
lock.lockInterruptibly();
try {
for (;;) {
// 头节点
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
// 等待队列不空,直至有任务通过 offer 入队并唤醒
available.await();
else {
// 获取头节点的延迟时间是否到时
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
return finishPoll(first);
// 逻辑到这说明头节点的延迟时间还没到
first = null;
// 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
if (leader != null)
available.await();
else {
// 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 在条件队列 available 使用带超时的挂起(堆顶任务.time - now() 纳秒值..)
available.awaitNanos(delay);
// 到达阻塞时间时,当前线程会从这里醒来来
} finally {
// t堆顶更新,leader 置为 null,offer 方法释放锁后,
// 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
if (leader == thisThread)
// leader 置为 null 用以接下来判断是否需要唤醒后继线程
leader = null;
}
}
}
}
} finally {
// 没有 leader 线程,头结点不为 null,唤醒阻塞获取头节点的线程,
// 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}- remove():删除节点,堆移除一个元素的时间复杂度是 O(log n),延迟任务维护了 heapIndex,直接访问的时间复杂度是 O(1),从而可以更快的移除元素,任务在队列中被取消后会进入该逻辑
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查找对象在队列数组中的下标
int i = indexOf(x);
// 节点不存在,返回 false
if (i < 0)
return false;
// 修改元素的 heapIndex,-1 代表删除
setIndex(queue[i], -1);
// 尾索引是长度-1
int s = --size;
// 尾节点作为替代节点
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// s == i 说明头节点就是尾节点,队列空了
if (s != i) {
// 向下调整
siftDown(i, replacement);
// 说明没发生调整
if (queue[i] == replacement)
// 上移和下移不可能同时发生,替代节点大于子节点时下移,否则上移
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}成员方法
提交任务
- schedule():延迟执行方法,并指定执行的时间,默认是当前时间
public void execute(Runnable command) {
// 以零延时任务的形式实现
schedule(command, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 判空
if (command == null || unit == null) throw new NullPointerException();
// 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展,并且【根据延迟时间设置任务触发的时间点】
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(
command, null, triggerTime(delay, unit)));
// 延迟执行
delayedExecute(t);
return t;
}
// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间
private long triggerTime(long delay, TimeUnit unit) {
// 设置触发的时间
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay
// 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}overflowFree 的原因:如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出
// 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱
// 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}- scheduleAtFixedRate():定时执行,一次任务的启动到下一次任务的启动的间隔
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任务封装,【指定初始的延迟时间和周期时间】
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(period));
// 默认返回本身
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 开始执行这个任务
delayedExecute(t);
return t;
}- scheduleWithFixedDelay():定时执行,一次任务的结束到下一次任务的启动的间隔
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}运行任务
- delayedExecute():校验线程池状态,延迟或周期性任务的主要执行方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池是 SHUTDOWN 状态,需要执行拒绝策略
if (isShutdown())
reject(task);
else {
// 把当前任务放入阻塞队列,因为需要【获取执行时间最近的】,当前任务需要比较
super.getQueue().add(task);
// 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
// 可以执行
ensurePrestart();
}
}- ensurePrestart():开启线程执行任务
// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker数目小于corePoolSize,则添加一个worker。
if (wc < corePoolSize)
// 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize
addWorker(null, true);
// corePoolSize = 0的情况,至少开启一个线程,【担保机制】
else if (wc == 0)
addWorker(null, false);
}- canRunInCurrentRunState():任务运行时都会被调用以校验当前状态是否可以运行任务
boolean canRunInCurrentRunState(boolean periodic) {
// 根据是否是周期任务判断,在线程池 shutdown 后是否继续执行该任务,默认非周期任务是继续执行的
return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}- onShutdown():删除并取消工作队列中的不需要再执行的任务
void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// shutdown 后是否仍然执行延时任务
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
// shutdown 后是否仍然执行周期任务
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果两者皆不可,则对队列中【所有任务】调用 cancel 取消并清空队列
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
// 不需要执行的任务删除并取消,已经取消的任务也需要从队列中删除
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
// 因为任务被从队列中清理掉,所以需要调用 tryTerminate 尝试【改变线程池的状态】
tryTerminate();
}ForkJoin
Fork/Join:线程池的实现,体现是分治思想,适用于能够进行任务拆分的 CPU 密集型运算,用于并行计算
任务拆分:将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,把每个任务的分解和合并交给不同的线程来完成,提升了运算效率
ForkJoin 使用 ForkJoinPool 来启动,是一个特殊的线程池,默认会创建与 CPU 核心数大小相同的线程池
任务有返回值继承 RecursiveTask,没有返回值继承 RecursiveAction
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(5)));
//拆分 5 + MyTask(4) --> 4 + MyTask(3) -->
}
// 1~ n 之间整数的和
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
public String toString() {
return "MyTask{" + "n=" + n + '}';
}
@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
return n;
}
// 将任务进行拆分(fork)
MyTask t1 = new MyTask(n - 1);
t1.fork();
// 合并(join)结果
int result = n + t1.join();
return result;
}
}继续拆分优化:
class AddTask extends RecursiveTask<Integer> {
int begin;
int end;
public AddTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public String toString() {
return "{" + begin + "," + end + '}';
}
@Override
protected Integer compute() {
// 5, 5
if (begin == end) {
return begin;
}
// 4, 5 防止多余的拆分 提高效率
if (end - begin == 1) {
return end + begin;
}
// 1 5
int mid = (end + begin) / 2; // 3
AddTask t1 = new AddTask(begin, mid); // 1,3
t1.fork();
AddTask t2 = new AddTask(mid + 1, end); // 4,5
t2.fork();
int result = t1.join() + t2.join();
return result;
}
}ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率:
每个线程都维护了一个双端队列,用来存储需要执行的任务
工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行
窃取的必须是最晚的任务,避免和队列所属线程发生竞争,但是队列中只有一个任务时还是会发生竞争
享元模式
享元模式(Flyweight pattern): 用于减少创建对象的数量,以减少内存占用和提高性能,这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式
异步模式:让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务,也可将其归类为分工模式,典型实现就是线程池
工作机制:享元模式尝试重用现有的同类对象,如果未找到匹配的对象,则创建新对象
自定义连接池:
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection con = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(con);
}).start();
}
}
class Pool {
//连接池的大小
private final int poolSize;
//连接对象的数组
private Connection[] connections;
//连接状态数组 0表示空闲 1表示繁忙
private AtomicIntegerArray states; //int[] -> AtomicIntegerArray
//构造方法
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i + 1));
}
}
//使用连接
public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
System.out.println(Thread.currentThread().getName() + " borrow " + connections[i]);
return connections[i];
}
}
}
//如果没有空闲连接,当前线程等待
synchronized (this) {
try {
System.out.println(Thread.currentThread().getName() + " wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//归还连接
public void free(Connection con) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == con) {//判断是否是同一个对象
states.set(i, 0);//不用cas的原因是只会有一个线程使用该连接
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " free " + con);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
private String name;
//.....
}同步器
AQS
核心思想
AQS:AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,许多同步类实现都依赖于该同步器
AQS 用状态属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
独占模式是只有一个线程能够访问资源,如 ReentrantLock
共享模式允许多个线程访问资源,如 Semaphore,ReentrantReadWriteLock 是组合式
AQS 核心思想:
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置锁定状态
请求的共享资源被占用,AQS 用队列实现线程阻塞等待以及被唤醒时锁分配的机制,将暂时获取不到锁的线程加入到队列中
CLH 是一种基于单向链表的高性能、公平的自旋锁,AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配

设计原理
设计原理:
- 获取锁:
while(state 状态不允许获取) { // tryAcquire(arg)
if(队列中还没有此线程) {
入队并阻塞 park
}
}
当前线程出队- 释放锁:
if(state 状态允许了) { // tryRelease(arg)
恢复阻塞的线程(s) unpark
}AbstractQueuedSynchronizer 中 state 设计:
state 使用了 32bit int 来维护同步状态,独占模式 0 表示未加锁状态,大于 0 表示已经加锁状态
private volatile int state;state 使用 volatile 修饰配合 cas 保证其修改时的原子性
state 表示线程重入的次数(独占模式)或者剩余许可数(共享模式)
state API:
protected final int getState():获取 state 状态protected final void setState(int newState):设置 state 状态protected final boolean compareAndSetState(int expect,int update):CAS 安全设置 state
封装线程的 Node 节点中 waitstate 设计:
使用 volatile 修饰配合 CAS 保证其修改时的原子性
表示 Node 节点的状态,有以下几种状态:
// 默认为 0
volatile int waitStatus;
// 由于超时或中断,此节点被取消,不会再改变状态
static final int CANCELLED = 1;
// 此节点后面的节点已(或即将)被阻止(通过park),【当前节点在释放或取消时必须唤醒后面的节点】
static final int SIGNAL = -1;
// 此节点当前在条件队列中
static final int CONDITION = -2;
// 将releaseShared传播到其他节点
static final int PROPAGATE = -3;阻塞恢复设计:
使用 park & unpark 来实现线程的暂停和恢复,因为命令的先后顺序不影响结果
park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
park 线程可以通过 interrupt 打断
队列设计:
- 使用了 FIFO 先入先出队列,并不支持优先级队列,同步队列是双向链表,便于出队入队
// 头结点,指向哑元节点
private transient volatile Node head;
// 阻塞队列的尾节点,阻塞队列不包含头结点,从 head.next → tail 认为是阻塞队列
private transient volatile Node tail;
static final class Node {
// 枚举:共享模式
static final Node SHARED = new Node();
// 枚举:独占模式
static final Node EXCLUSIVE = null;
// node 需要构建成 FIFO 队列,prev 指向前继节点
volatile Node prev;
// next 指向后继节点
volatile Node next;
// 当前 node 封装的线程
volatile Thread thread;
// 条件队列是单向链表,只有后继指针,条件队列使用该属性
Node nextWaiter;
}
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet,条件队列是单向链表
public class ConditionObject implements Condition, java.io.Serializable {
// 指向条件队列的第一个 node 节点
private transient Node firstWaiter;
// 指向条件队列的最后一个 node 节点
private transient Node lastWaiter;
}模板对象
同步器的设计是基于模板方法模式,该模式是基于继承的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码
使用者继承
AbstractQueuedSynchronizer并重写指定的方法将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,这些模板方法会调用使用者重写的方法
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它
tryAcquire(int) //独占方式。尝试获取资源,成功则返回true,失败则返回false
tryRelease(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功但没有剩余可用资源;正数表示成功且有剩余资源
tryReleaseShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false
默认情况下,每个方法都抛出
UnsupportedOperationException这些方法的实现必须是内部线程安全的
AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用
自定义
自定义一个不可重入锁:
class MyLock implements Lock {
//独占锁 不可重入
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 加上锁 设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override //解锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);//volatile 修饰的变量放在后面,防止指令重排
return true;
}
@Override //是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override //加锁(不成功进入等待队列等待)
public void lock() {
sync.acquire(1);
}
@Override //加锁 可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override //尝试加锁,尝试一次
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override //尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override //解锁
public void unlock() {
sync.release(1);
}
@Override //条件变量
public Condition newCondition() {
return sync.newCondition();
}
}ReentrantLock
锁对比
ReentrantLock 相对于 synchronized 具备如下特点:
锁的实现:synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的
性能:新版本 Java 对 synchronized 进行了很多优化,synchronized 与 ReentrantLock 大致相同
使用:ReentrantLock 需要手动解锁,synchronized 执行完代码块自动解锁
可中断:ReentrantLock 可中断,而 synchronized 不行
公平锁:公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁
ReentrantLock 可以设置公平锁,synchronized 中的锁是非公平的
不公平锁的含义是阻塞队列内公平,队列外非公平
锁超时:尝试获取锁,超时获取不到直接放弃,不进入阻塞队列
- ReentrantLock 可以设置超时时间,synchronized 会一直等待
锁绑定多个条件:一个 ReentrantLock 可以同时绑定多个 Condition 对象,更细粒度的唤醒线程
两者都是可重入锁
使用锁
构造方法:ReentrantLock lock = new ReentrantLock();
ReentrantLock 类 API:
public void lock():获得锁如果锁没有被另一个线程占用,则将锁定计数设置为 1
如果当前线程已经保持锁定,则保持计数增加 1
如果锁被另一个线程保持,则当前线程被禁用线程调度,并且在锁定已被获取之前处于休眠状态
public void unlock():尝试释放锁如果当前线程是该锁的持有者,则保持计数递减
如果保持计数现在为零,则锁定被释放
如果当前线程不是该锁的持有者,则抛出异常
基本语法:
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}公平锁
基本使用
构造方法:ReentrantLock lock = new ReentrantLock(true)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//ReentrantLock 默认是不公平的:
public ReentrantLock() {
sync = new NonfairSync();
}
//说明:公平锁一般没有必要,会降低并发度非公原理
加锁
NonfairSync 继承自 AQS
public void lock() {
sync.lock();
}- 没有竞争:ExclusiveOwnerThread 属于 Thread-0,state 设置为 1
// ReentrantLock.NonfairSync#lock
final void lock() {
// 用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示【获得了独占锁】
if (compareAndSetState(0, 1))
// 设置当前线程为独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//失败进入
}- 第一个竞争出现:Thread-1 执行,CAS 尝试将 state 由 0 改为 1,结果失败(第一次),进入 acquire 逻辑
// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// tryAcquire 尝试获取锁失败时, 会调用 addWaiter 将当前线程封装成node入队,acquireQueued 阻塞当前线程,
// acquireQueued 返回 true 表示挂起过程中线程被中断唤醒过,false 表示未被中断过
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果线程被中断了逻辑来到这,完成一次真正的打断效果
selfInterrupt();
}
进入 tryAcquire 尝试获取锁逻辑,这时 state 已经是1,结果仍然失败(第二次),加锁成功有两种情况:
当前 AQS 处于无锁状态
加锁线程就是当前线程,说明发生了锁重入
// ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 抢占成功返回 true,抢占失败返回 false
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// state 值
int c = getState();
// 条件成立说明当前处于【无锁状态】
if (c == 0) {
//如果还没有获得锁,尝试用cas获得,这里体现非公平性: 不去检查 AQS 队列是否有阻塞线程直接获取锁
if (compareAndSetState(0, acquires)) {
// 获取锁成功设置当前线程为独占锁线程。
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经有线程获得了锁, 独占锁线程还是当前线程, 表示【发生了锁重入】
else if (current == getExclusiveOwnerThread()) {
// 更新锁重入的值
int nextc = c + acquires;
// 越界判断,当重入的深度很深时,会导致 nextc < 0,int值达到最大之后再 + 1 变负数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新 state 的值,这里不使用 cas 是因为当前线程正在持有锁,所以这里的操作相当于在一个管程内
setState(nextc);
return true;
}
// 获取失败
return false;
}接下来进入 addWaiter 逻辑,构造 Node 队列(不是阻塞队列),前置条件是当前线程获取锁失败,说明有线程占用了锁
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
Node 的创建是懒惰的,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
// AbstractQueuedSynchronizer#addWaiter,返回当前线程的 node 节点
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 快速入队,如果 tail 不为 null,说明存在队列
if (pred != null) {
// 将当前节点的前驱节点指向 尾节点
node.prev = pred;
// 通过 cas 将 Node 对象加入 AQS 队列,成为尾节点,【尾插法】
if (compareAndSetTail(pred, node)) {
pred.next = node;// 双向链表
return node;
}
}
// 初始时队列为空,或者 CAS 失败进入这里
enq(node);
return node;
}
// AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
// 自旋入队,必须入队成功才结束循环
for (;;) {
Node t = tail;
// 说明当前锁被占用,且当前线程可能是【第一个获取锁失败】的线程,【还没有建立队列】
if (t == null) {
// 设置一个【哑元节点】,头尾指针都指向该节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 自旋到这,普通入队方式,首先赋值尾节点的前驱节点【尾插法】
node.prev = t;
// 【在设置完尾节点后,才更新的原始尾节点的后继节点,所以此时从前往后遍历会丢失尾节点】
if (compareAndSetTail(t, node)) {
//【此时 t.next = null,并且这里已经 CAS 结束,线程并不是安全的】
t.next = node;
return t; // 返回当前 node 的前驱节点
}
}
}
}
线程节点加入队列成功,进入 AbstractQueuedSynchronizer#acquireQueued 逻辑阻塞线程
acquireQueued 会在一个自旋中不断尝试获得锁,失败后进入 park 阻塞
如果当前线程是在 head 节点后,会再次 tryAcquire 尝试获取锁,state 仍为 1 则失败(第三次)
final boolean acquireQueued(final Node node, int arg) {
// true 表示当前线程抢占锁失败,false 表示成功
boolean failed = true;
try {
// 中断标记,表示当前线程是否被中断
boolean interrupted = false;
for (;;) {
// 获得当前线程节点的前驱节点
final Node p = node.predecessor();
// 前驱节点是 head, FIFO 队列的特性表示轮到当前线程可以去获取锁
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置当前线程自己的 node 为 head
setHead(node);
p.next = null; // help GC
// 表示抢占锁成功
failed = false;
// 返回当前线程是否被中断
return interrupted;
}
// 判断是否应当 park,返回 false 后需要新一轮的循环,返回 true 进入条件二阻塞线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 条件二返回结果是当前线程是否被打断,没有被打断返回 false 不进入这里的逻辑
// 【就算被打断了,也会继续循环,并不会返回】
interrupted = true;
}
} finally {
// 【可打断模式下才会进入该逻辑】
if (failed)
cancelAcquire(node);
}
}- 进入 shouldParkAfterFailedAcquire 逻辑,**将前驱 node 的 waitStatus 改为 -1**,返回 false;waitStatus 为 -1 的节点用来唤醒下一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 表示前置节点是个可以唤醒当前节点的节点,返回 true
if (ws == Node.SIGNAL)
return true;
// 前置节点的状态处于取消状态,需要【删除前面所有取消的节点】, 返回到外层循环重试
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 获取到非取消的节点,连接上当前节点
pred.next = node;
// 默认情况下 node 的 waitStatus 是 0,进入这里的逻辑
} else {
// 【设置上一个节点状态为 Node.SIGNAL】,返回外层循环重试
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回不应该 park,再次尝试一次
return false;
}- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,这时 state 仍为 1 获取失败(第四次)
- 当再次进入 shouldParkAfterFailedAcquire 时,这时其前驱 node 的 waitStatus 已经是 -1 了,返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// 判断当前线程是否被打断,清除打断标记
return Thread.interrupted();
}- 再有多个线程经历竞争失败后:

解锁
ReentrantLock#unlock:释放锁
public void unlock() {
sync.release(1);
}Thread-0 释放锁,进入 release 流程
进入 tryRelease,设置 exclusiveOwnerThread 为 null,state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor
// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
// 尝试释放锁,tryRelease 返回 true 表示当前线程已经【完全释放锁,重入的释放了】
if (tryRelease(arg)) {
// 队列头节点
Node h = head;
// 头节点什么时候是空?没有发生锁竞争,没有竞争线程创建哑元节点
// 条件成立说明阻塞队列有等待线程,需要唤醒 head 节点后面的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}// ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
// 减去释放的值,可能重入
int c = getState() - releases;
// 如果当前线程不是持有锁的线程直接报错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否已经完全释放锁
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才完全释放锁成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 当前线程就是持有锁线程,所以可以直接更新锁,不需要使用 CAS
setState(c);
return free;
}进入 AbstractQueuedSynchronizer#unparkSuccessor 方法,唤醒当前节点的后继节点
找到队列中距离 head 最近的一个没取消的 Node,unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
private void unparkSuccessor(Node node) {
// 当前节点的状态
int ws = node.waitStatus;
if (ws < 0)
// 【尝试重置状态为 0】,因为当前节点要完成对后续节点的唤醒任务了,不需要 -1 了
compareAndSetWaitStatus(node, ws, 0);
// 找到需要 unpark 的节点,当前节点的下一个
Node s = node.next;
// 已取消的节点不能唤醒,需要找到距离头节点最近的非取消的节点
if (s == null || s.waitStatus > 0) {
s = null;
// AQS 队列【从后至前】找需要 unpark 的节点,直到 t == 当前的 node 为止,找不到就不唤醒了
for (Node t = tail; t != null && t != node; t = t.prev)
// 说明当前线程状态需要被唤醒
if (t.waitStatus <= 0)
// 置换引用
s = t;
}
// 【找到合适的可以被唤醒的 node,则唤醒线程】
if (s != null)
LockSupport.unpark(s.thread);
}**从后向前的唤醒的原因**:enq 方法中,节点是尾插法,首先赋值的是尾节点的前驱节点,此时前驱节点的 next 并没有指向尾节点,从前遍历会丢失尾节点
唤醒的线程会从 park 位置开始执行,如果加锁成功(没有竞争),会设置
exclusiveOwnerThread 为 Thread-1,state = 1
head 指向刚刚 Thread-1 所在的 Node,该 Node 会清空 Thread
原本的 head 因为从链表断开,而可被垃圾回收(图中有错误,原来的头节点的 waitStatus 被改为 0 了)

如果这时有其它线程来竞争**(非公平)**,例如这时有 Thread-4 来了并抢占了锁
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

公平原理
与非公平锁主要区别在于 tryAcquire 方法:先检查 AQS 队列中是否有前驱节点,没有才去 CAS 竞争
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入
return false;
}
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 头尾指向一个节点,链表为空,返回false
return h != t &&
// 头尾之间有节点,判断头节点的下一个是不是空
// 不是空进入最后的判断,第二个节点的线程是否是本线程,不是返回 true,表示当前节点有前驱节点
((s = h.next) == null || s.thread != Thread.currentThread());
}可重入
可重入是指同一个线程如果首次获得了这把锁,那么它是这把锁的拥有者,因此有权利再次获取这把锁,如果不可重入锁,那么第二次获得锁时,自己也会被锁挡住,直接造成死锁
源码解析参考:nonfairTryAcquire(int acquires)) 和 tryRelease(int releases)
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " execute method2");
} finally {
lock.unlock();
}
}在 Lock 方法加两把锁会是什么情况呢?
加锁两次解锁两次:正常执行
加锁两次解锁一次:程序直接卡死,线程不能出来,也就说明申请几把锁,最后需要解除几把锁
加锁一次解锁两次:运行程序会直接报错
public void getLock() {
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get Lock");
} finally {
lock.unlock();
//lock.unlock();
}
}可打断
基本使用
public void lockInterruptibly():获得可打断的锁
如果没有竞争此方法就会获取 lock 对象锁
如果有竞争就进入阻塞队列,可以被其他线程用 interrupt 打断
注意:如果是不可中断模式,那么即使使用了 interrupt 也不会让等待状态中的线程中断
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
System.out.println("尝试获取锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println("没有获取到锁,被打断,直接返回");
return;
}
try {
System.out.println("获取到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
t1.start();
Thread.sleep(2000);
System.out.println("主线程进行打断锁");
t1.interrupt();
}实现原理
- 不可打断模式:即使它被打断,仍会驻留在 AQS 阻塞队列中,一直要等到获得锁后才能得知自己被打断了
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//阻塞等待
// 如果acquireQueued返回true,打断状态 interrupted = true
selfInterrupt();
}
static void selfInterrupt() {
// 知道自己被打断了,需要重新产生一次中断完成中断效果
Thread.currentThread().interrupt();
}
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
// 条件二中判断当前线程是否被打断,被打断返回true,设置中断标记为 true,【获取锁后返回】
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// 判断当前线程是否被打断,清除打断标记,被打断返回true
return Thread.interrupted();
}- 可打断模式:AbstractQueuedSynchronizer#acquireInterruptibly,被打断后会直接抛出异常
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg) {
// 被其他线程打断了直接返回 false
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 没获取到锁,进入这里
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 返回封装当前线程的节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//...
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 【在 park 过程中如果被 interrupt 会抛出异常】, 而不会再次进入循环获取锁后才完成打断效果
throw new InterruptedException();
}
} finally {
// 抛出异常前会进入这里
if (failed)
// 取消当前线程的节点
cancelAcquire(node);
}
}
// 取消节点出队的逻辑
private void cancelAcquire(Node node) {
// 判空
if (node == null)
return;
// 把当前节点封装的 Thread 置为空
node.thread = null;
// 获取当前取消的 node 的前驱节点
Node pred = node.prev;
// 前驱节点也被取消了,循环找到前面最近的没被取消的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取前驱节点的后继节点,可能是当前 node,也可能是 waitStatus > 0 的节点
Node predNext = pred.next;
// 把当前节点的状态设置为 【取消状态 1】
node.waitStatus = Node.CANCELLED;
// 条件成立说明当前节点是尾节点,把当前节点的前驱节点设置为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// 把前驱节点的后继节点置空,这里直接把所有的取消节点出队
compareAndSetNext(pred, predNext, null);
} else {
// 说明当前节点不是 tail 节点
int ws;
// 条件一成立说明当前节点不是 head.next 节点
if (pred != head &&
// 判断前驱节点的状态是不是 -1,不成立说明前驱状态可能是 0 或者刚被其他线程取消排队了
((ws = pred.waitStatus) == Node.SIGNAL ||
// 如果状态不是 -1,设置前驱节点的状态为 -1
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 前驱节点的线程不为null
pred.thread != null) {
Node next = node.next;
// 当前节点的后继节点是正常节点
if (next != null && next.waitStatus <= 0)
// 把 前驱节点的后继节点 设置为 当前节点的后继节点,【从队列中删除了当前节点】
compareAndSetNext(pred, predNext, next);
} else {
// 当前节点是 head.next 节点,唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}锁超时
基本使用
public boolean tryLock():尝试获取锁,获取到返回 true,获取不到直接放弃,不进入阻塞队列
public boolean tryLock(long timeout, TimeUnit unit):在给定时间内获取锁,获取不到就退出
注意:tryLock 期间也可以被打断
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
System.out.println("获取不到锁");
return;
}
} catch (InterruptedException e) {
System.out.println("被打断,获取不到锁");
return;
}
try {
log.debug("获取到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
System.out.println("主线程获取到锁");
t1.start();
Thread.sleep(1000);
try {
System.out.println("主线程释放了锁");
} finally {
lock.unlock();
}
}实现原理
成员变量:指定超时限制的阈值,小于该值的线程不会被挂起
static final long spinForTimeoutThreshold = 1000L;超时时间设置的小于该值,就会被禁止挂起,因为阻塞在唤醒的成本太高,不如选择自旋空转
tryLock()
public boolean tryLock() {
// 只尝试一次
return sync.nonfairTryAcquire(1);
}- tryLock(long timeout, TimeUnit unit)
public final boolean tryAcquireNanos(int arg, long nanosTimeout) {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire 尝试一次
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) {
if (nanosTimeout <= 0L)
return false;
// 获取最后期限的时间戳
final long deadline = System.nanoTime() + nanosTimeout;
//...
try {
for (;;) {
//...
// 计算还需等待的时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) //时间已到
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果 nanosTimeout 大于该值,才有阻塞的意义,否则直接自旋会好点
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 【被打断会报异常】
if (Thread.interrupted())
throw new InterruptedException();
}
}
}哲学家就餐
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");//...
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public void run() {
while (true) {
// 尝试获得左手筷子
if (left.tryLock()) {
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try {
System.out.println("eating...");
Thread.sleep(1000);
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
}
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}条件变量
基本使用
synchronized 的条件变量,是当条件不满足时进入 WaitSet 等待;ReentrantLock 的条件变量比 synchronized 强大之处在于支持多个条件变量
ReentrantLock 类获取 Condition 对象:public Condition newCondition()
Condition 类 API:
void await():当前线程从运行状态进入等待状态,释放锁void signal():唤醒一个等待在 Condition 上的线程,但是必须获得与该 Condition 相关的锁
使用流程:
await / signal 前需要获得锁
await 执行后,会释放锁进入 ConditionObject 等待
await 的线程被唤醒去重新竞争 lock 锁
线程在条件队列被打断会抛出中断异常
竞争 lock 锁成功后,从 await 后继续执行
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
//创建一个新的条件变量
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
new Thread(() -> {
try {
lock.lock();
System.out.println("进入等待");
//进入休息室等待
condition1.await();
System.out.println("被唤醒了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
Thread.sleep(1000);
//叫醒
new Thread(() -> {
try {
lock.lock();
//唤醒
condition2.signal();
} finally {
lock.unlock();
}
}).start();
}实现原理
await
总体流程是将 await 线程包装成 node 节点放入 ConditionObject 的条件队列,如果被唤醒就将 node 转移到 AQS 的执行阻塞队列,等待获取锁,每个 Condition 对象都包含一个等待队列
- 开始 Thread-0 持有锁,调用 await,线程进入 ConditionObject 等待,直到被唤醒或打断,调用 await 方法的线程都是持锁状态的,所以说逻辑里不存在并发
public final void await() throws InterruptedException {
// 判断当前线程是否是中断状态,是就直接给个中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 将调用 await 的线程包装成 Node,添加到条件队列并返回
Node node = addConditionWaiter();
// 完全释放节点持有的锁,因为其他线程唤醒当前线程的前提是【持有锁】
int savedState = fullyRelease(node);
// 设置打断模式为没有被打断,状态码为 0
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 阻塞队列, park 阻塞,等待进入阻塞队列
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果被打断,退出等待队列,对应的 node 【也会被迁移到阻塞队列】尾部,状态设置为 0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 逻辑到这说明当前线程退出等待队列,进入【阻塞队列】
// 尝试枪锁,释放了多少锁就【重新获取多少锁】,获取锁成功判断打断模式
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// node 在条件队列时 如果被外部线程中断唤醒,会加入到阻塞队列,但是并未设 nextWaiter = null
if (node.nextWaiter != null)
// 清理条件队列内所有已取消的 Node
unlinkCancelledWaiters();
// 条件成立说明挂起期间发生过中断
if (interruptMode != 0)
// 应用打断模式
reportInterruptAfterWait(interruptMode);
}
// 打断模式 - 在退出等待时重新设置打断状态
private static final int REINTERRUPT = 1;
// 打断模式 - 在退出等待时抛出异常
private static final int THROW_IE = -1;
- 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
private Node addConditionWaiter() {
// 获取当前条件队列的尾节点的引用,保存到局部变量 t 中
Node t = lastWaiter;
// 当前队列中不是空,并且节点的状态不是 CONDITION(-2),说明当前节点发生了中断
if (t != null && t.waitStatus != Node.CONDITION) {
// 清理条件队列内所有已取消的 Node
unlinkCancelledWaiters();
// 清理完成重新获取 尾节点 的引用
t = lastWaiter;
}
// 创建一个关联当前线程的新 node, 设置状态为 CONDITION(-2),添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node; // 空队列直接放在队首【不用CAS因为执行线程是持锁线程,并发安全】
else
t.nextWaiter = node; // 非空队列队尾追加
lastWaiter = node; // 更新队尾的引用
return node;
}
// 清理条件队列内所有已取消(不是CONDITION)的 node,【链表删除的逻辑】
private void unlinkCancelledWaiters() {
// 从头节点开始遍历【FIFO】
Node t = firstWaiter;
// 指向正常的 CONDITION 节点
Node trail = null;
// 等待队列不空
while (t != null) {
// 获取当前节点的后继节点
Node next = t.nextWaiter;
// 判断 t 节点是不是 CONDITION 节点,条件队列内不是 CONDITION 就不是正常的
if (t.waitStatus != Node.CONDITION) {
// 不是正常节点,需要 t 与下一个节点断开
t.nextWaiter = null;
// 条件成立说明遍历到的节点还未碰到过正常节点
if (trail == null)
// 更新 firstWaiter 指针为下个节点
firstWaiter = next;
else
// 让上一个正常节点指向 当前取消节点的 下一个节点,【删除非正常的节点】
trail.nextWaiter = next;
// t 是尾节点了,更新 lastWaiter 指向最后一个正常节点
if (next == null)
lastWaiter = trail;
} else {
// trail 指向的是正常节点
trail = t;
}
// 把 t.next 赋值给 t,循环遍历
t = next;
}
}- 接下来 Thread-0 进入 AQS 的 fullyRelease 流程,释放同步器上的锁
// 线程可能重入,需要将 state 全部释放
final int fullyRelease(Node node) {
// 完全释放锁是否成功,false 代表成功
boolean failed = true;
try {
// 获取当前线程所持有的 state 值总数
int savedState = getState();
// release -> tryRelease 解锁重入锁
if (release(savedState)) {
// 释放成功
failed = false;
// 返回解锁的深度
return savedState;
} else {
// 解锁失败抛出异常
throw new IllegalMonitorStateException();
}
} finally {
// 没有释放成功,将当前 node 设置为取消状态
if (failed)
node.waitStatus = Node.CANCELLED;
}
}- fullyRelease 中会 unpark AQS 队列中的下一个节点竞争锁,假设 Thread-1 竞争成功

- Thread-0 进入 isOnSyncQueue 逻辑判断节点是否移动到阻塞队列,没有就 park 阻塞 Thread-0
final boolean isOnSyncQueue(Node node) {
// node 的状态是 CONDITION,signal 方法是先修改状态再迁移,所以前驱节点为空证明还【没有完成迁移】
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它 node,因为条件队列的 next 指针为 null
if (node.next != null)
return true;
// 说明【可能在阻塞队列,但是是尾节点】
// 从阻塞队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 false
return findNodeFromTail(node);
}- await 线程 park 后如果被 unpark 或者被打断,都会进入 checkInterruptWhileWaiting 判断线程是否被打断:在条件队列被打断的线程需要抛出异常
private int checkInterruptWhileWaiting(Node node) {
// Thread.interrupted() 返回当前线程中断标记位,并且重置当前标记位 为 false
// 如果被中断了,根据是否在条件队列被中断的,设置中断状态码
return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 这个方法只有在线程是被打断唤醒时才会调用
final boolean transferAfterCancelledWait(Node node) {
// 条件成立说明当前node一定是在条件队列内,因为 signal 迁移节点到阻塞队列时,会将节点的状态修改为 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 把【中断唤醒的 node 加入到阻塞队列中】
enq(node);
// 表示是在条件队列内被中断了,设置为 THROW_IE 为 -1
return true;
}
//执行到这里的情况:
//1.当前node已经被外部线程调用 signal 方法将其迁移到 阻塞队列 内了
//2.当前node正在被外部线程调用 signal 方法将其迁移至 阻塞队列 进行中状态
// 如果当前线程还没到阻塞队列,一直释放 CPU
while (!isOnSyncQueue(node))
Thread.yield();
// 表示当前节点被中断唤醒时不在条件队列了,设置为 REINTERRUPT 为 1
return false;
}- 最后开始处理中断状态:
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
// 条件成立说明【在条件队列内发生过中断,此时 await 方法抛出中断异常】
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 条件成立说明【在条件队列外发生的中断,此时设置当前线程的中断标记位为 true】
else if (interruptMode == REINTERRUPT)
// 进行一次自己打断,产生中断的效果
selfInterrupt();
}signal
- 假设 Thread-1 要来唤醒 Thread-0,进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node,必须持有锁才能唤醒, 因此 doSignal 内线程安全
public final void signal() {
// 判断调用 signal 方法的线程是否是独占锁持有线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件队列中第一个 Node
Node first = firstWaiter;
// 不为空就将第该节点【迁移到阻塞队列】
if (first != null)
doSignal(first);
}
// 唤醒 - 【将没取消的第一个节点转移至 AQS 队列尾部】
private void doSignal(Node first) {
do {
// 成立说明当前节点的下一个节点是 null,当前节点是尾节点了,队列中只有当前一个节点了
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 将等待队列中的 Node 转移至 AQS 队列,不成功且还有节点则继续循环
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// signalAll() 会调用这个函数,唤醒所有的节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
// 唤醒所有的节点,都放到阻塞队列中
} while (first != null);
}- 执行 transferForSignal,先将节点的 waitStatus 改为 0,然后加入 AQS 阻塞队列尾部,将 Thread-3 的 waitStatus 改为 -1
// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
final boolean transferForSignal(Node node) {
// CAS 修改当前节点的状态,修改为 0,因为当前节点马上要迁移到阻塞队列了
// 如果状态已经不是 CONDITION, 说明线程被取消(await 释放全部锁失败)或者被中断(可打断 cancelAcquire)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 返回函数调用处继续寻找下一个节点
return false;
// 【先改状态,再进行迁移】
// 将当前 node 入阻塞队列,p 是当前节点在阻塞队列的【前驱节点】
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱节点被取消或者不能设置状态为 Node.SIGNAL,就 unpark 取消当前节点线程的阻塞状态,
// 让 thread-0 线程竞争锁,重新同步状态
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- Thread-1 释放锁,进入 unlock 流程
ReadWrite
读写锁
独占锁:指该锁一次只能被一个线程所持有,对 ReentrantLock 和 Synchronized 而言都是独占锁
共享锁:指该锁可以被多个线程锁持有
ReentrantReadWriteLock 其读锁是共享锁,写锁是独占锁
作用:多个线程同时读一个资源类没有任何问题,为了满足并发量,读取共享资源应该同时进行,但是如果一个线程想去写共享资源,就不应该再有其它线程可以对该资源进行读或写
使用规则:
- 加锁解锁格式:
r.lock();
try {
// 临界区
} finally {
r.unlock();
}读-读能共存、读-写不能共存、写-写不能共存
读锁不支持条件变量
重入时升级不支持:持有读锁的情况下去获取写锁会导致获取写锁永久等待,需要先释放读,再去获得写
重入时降级支持:持有写锁的情况下去获取读锁,造成只有当前线程会持有读锁,因为写锁会互斥其他的锁
w.lock();
try {
r.lock();// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
try {
// ...
} finally{
w.unlock();// 要在写锁释放之前获取读锁
}
} finally{
r.unlock();
}构造方法:
public ReentrantReadWriteLock():默认构造方法,非公平锁public ReentrantReadWriteLock(boolean fair):true 为公平锁
常用API:
public ReentrantReadWriteLock.ReadLock readLock():返回读锁public ReentrantReadWriteLock.WriteLock writeLock():返回写锁public void lock():加锁public void unlock():解锁public boolean tryLock():尝试获取锁
读读并发:
public static void main(String[] args) {
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock r = rw.readLock();
ReentrantReadWriteLock.WriteLock w = rw.writeLock();
new Thread(() -> {
r.lock();
try {
Thread.sleep(2000);
System.out.println("Thread 1 running " + new Date());
} finally {
r.unlock();
}
},"t1").start();
new Thread(() -> {
r.lock();
try {
Thread.sleep(2000);
System.out.println("Thread 2 running " + new Date());
} finally {
r.unlock();
}
},"t2").start();
}缓存应用
缓存更新时,是先清缓存还是先更新数据库
先清缓存:可能造成刚清理缓存还没有更新数据库,线程直接查询了数据库更新过期数据到缓存
先更新据库:可能造成刚更新数据库,还没清空缓存就有线程从缓存拿到了旧数据
补充情况:查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

可以使用读写锁进行操作
实现原理
成员属性
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个,原理与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
读写锁:
private final ReentrantReadWriteLock.ReadLock readerLock;private final ReentrantReadWriteLock.WriteLock writerLock;构造方法:默认是非公平锁,可以指定参数创建公平锁
public ReentrantReadWriteLock(boolean fair) {
// true 为公平锁
sync = fair ? new FairSync() : new NonfairSync();
// 这两个 lock 共享同一个 sync 实例,都是由 ReentrantReadWriteLock 的 sync 提供同步实现
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}Sync 类的属性:
- 统计变量:
// 用来移位
static final int SHARED_SHIFT = 16;
// 高16位的1
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 65535,16个1,代表写锁的最大重入次数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 低16位掩码:0b 1111 1111 1111 1111,用来获取写锁重入的次数
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;- 获取读写锁的次数:
// 获取读写锁的读锁分配的总次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁(独占)锁的重入次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }- 内部类:
// 记录读锁线程自己的持有读锁的数量(重入次数),因为 state 高16位记录的是全局范围内所有的读线程获取读锁的总量
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
// 线程安全的存放线程各自的 HoldCounter 对象
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}- 内部类实例:
// 当前线程持有的可重入读锁的数量,计数为 0 时删除
private transient ThreadLocalHoldCounter readHolds;
// 记录最后一个获取【读锁】线程的 HoldCounter 对象
private transient HoldCounter cachedHoldCounter;- 首次获取锁:
// 第一个获取读锁的线程
private transient Thread firstReader = null;
// 记录该线程持有的读锁次数(读锁重入次数)
private transient int firstReaderHoldCount;- Sync 构造方法:
Sync() {
readHolds = new ThreadLocalHoldCounter();
// 确保其他线程的数据可见性,state 是 volatile 修饰的变量,重写该值会将线程本地缓存数据【同步至主存】
setState(getState());
}加锁原理
- t1 线程:w.lock(写锁),成功上锁 state = 0_1
// lock() -> sync.acquire(1);
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// 尝试获得写锁,获得写锁失败,将当前线程关联到一个 Node 对象上, 模式为独占模式
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获得低 16 位, 代表写锁的 state 计数
int w = exclusiveCount(c);
// 说明有读锁或者写锁
if (c != 0) {
// c != 0 and w == 0 表示有读锁,【读锁不能升级】,直接返回 false
// w != 0 说明有写锁,写锁的拥有者不是自己,获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 执行到这里只有一种情况:【写锁重入】,所以下面几行代码不存在并发
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入, 获得锁成功,没有并发,所以不使用 CAS
setState(c + acquires);
return true;
}
// c == 0,说明没有任何锁,判断写锁是否该阻塞,是 false 就尝试获取锁,失败返回 false
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
// 获得锁成功,设置锁的持有线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
// 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
final boolean writerShouldBlock() {
return false;
}
// 公平锁会检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}t2 r.lock(读锁),进入 tryAcquireShared 流程:
返回 -1 表示失败
如果返回 0 表示成功
返回正数表示还有多少后继节点支持共享模式,读写锁返回 1
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// tryAcquireShared 返回负数, 表示获取读锁失败
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 尝试以共享模式获取
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// exclusiveCount(c) 代表低 16 位, 写锁的 state,成立说明有线程持有写锁
// 写锁的持有者不是当前线程,则获取读锁失败,【写锁允许降级】
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 高 16 位,代表读锁的 state,共享锁分配出去的总次数
int r = sharedCount(c);
// 读锁是否应该阻塞
if (!readerShouldBlock() && r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 尝试增加读锁计数
// 加锁成功
// 加锁之前读锁为 0,说明当前线程是第一个读锁线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 第一个读锁线程是自己就发生了读锁重入
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// cachedHoldCounter 设置为当前线程的 holdCounter 对象,即最后一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
// 说明还没设置 rh
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程的锁重入的对象,赋值给 cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
// 还没重入
else if (rh.count == 0)
readHolds.set(rh);
// 重入 + 1
rh.count++;
}
// 读锁加锁成功
return 1;
}
// 逻辑到这 应该阻塞,或者 cas 加锁失败
// 会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
return fullTryAcquireShared(current);
}
// 非公平锁 readerShouldBlock 偏向写锁一些,看 AQS 阻塞队列中第一个节点是否是写锁,是则阻塞,反之不阻塞
// 防止一直有读锁线程,导致写锁线程饥饿
// true 则该阻塞, false 则不阻塞
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
final int fullTryAcquireShared(Thread current) {
// 当前读锁线程持有的读锁次数对象
HoldCounter rh = null;
for (;;) {
int c = getState();
// 说明有线程持有写锁
if (exclusiveCount(c) != 0) {
// 写锁不是自己则获取锁失败
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 条件成立说明当前线程是 firstReader,当前锁是读忙碌状态,而且当前线程也是读锁重入
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
// 最后一个读锁的 HoldCounter
rh = cachedHoldCounter;
// 说明当前线程也不是最后一个读锁
if (rh == null || rh.tid != getThreadId(current)) {
// 获取当前线程的 HoldCounter
rh = readHolds.get();
// 条件成立说明 HoldCounter 对象是上一步代码新建的
// 当前线程不是锁重入,在 readerShouldBlock() 返回 true 时需要去排队
if (rh.count == 0)
// 防止内存泄漏
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 越界判断
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 读锁加锁,条件内的逻辑与 tryAcquireShared 相同
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}- 获取读锁失败,进入 sync.doAcquireShared(1) 流程开始阻塞,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
private void doAcquireShared(int arg) {
// 将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点就头节点就去尝试获取锁
if (p == head) {
// 再一次尝试获取读锁
int r = tryAcquireShared(arg);
// r >= 0 表示获取成功
if (r >= 0) {
//【这里会设置自己为头节点,唤醒相连的后序的共享节点】
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 是否在获取读锁失败时阻塞 park 当前线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,shouldParkAfterFailedAcquire 内把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试 tryAcquireShared,不成功在 parkAndCheckInterrupt() 处 park

这种状态下,假设又有 t3 r.lock,t4 w.lock,这期间 t1 仍然持有锁,就变成了下面的样子

解锁原理
- t1 w.unlock, 写锁解锁
public void unlock() {
// 释放锁
sync.release(1);
}
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 头节点不为空并且不是等待状态不是 0,唤醒后继的非取消节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因为可重入的原因, 写锁计数为 0, 才算释放成功
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}唤醒流程 sync.unparkSuccessor,这时 t2 在 doAcquireShared 的 parkAndCheckInterrupt() 处恢复运行,继续循环,执行 tryAcquireShared 成功则让读锁计数加一
接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点;还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒下一个节点,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行,唤醒连续的所有的共享节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置自己为 head 节点
setHead(node);
// propagate 表示有共享资源(例如共享读锁或信号量),为 0 就没有资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取下一个节点
Node s = node.next;
// 如果当前是最后一个节点,或者下一个节点是【等待共享读锁的节点】
if (s == null || s.isShared())
// 唤醒后继节点
doReleaseShared();
}
}
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// SIGNAL 唤醒后继
if (ws == Node.SIGNAL) {
// 因为读锁共享,如果其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
// 防止 unparkSuccessor 被多次执行
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的 head,
// 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
if (h == head)
break;
}
}
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 读锁解锁,进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但计数还不为零,t3 同样让计数减一,计数为零,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒下一个节点
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程,计数为 0 才是真正释放
if (compareAndSetState(c, nextc))
// 返回是否已经完全释放了
return nextc == 0;
}
}- t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是头节点的临节点,并且没有其他节点竞争,tryAcquire(1) 成功,修改头结点,流程结束

Stamped
StampedLock:读写锁,该类自 JDK 8 加入,是为了进一步优化读性能
特点:
在使用读锁、写锁时都必须配合戳使用
StampedLock 不支持条件变量
StampedLock 不支持重入
基本用法
加解读锁:
long stamp = lock.readLock();
lock.unlockRead(stamp); // 类似于 unpark,解指定的锁加解写锁:
long stamp = lock.writeLock();
lock.unlockWrite(stamp);乐观读,StampedLock 支持
tryOptimisticRead()方法,读取完毕后做一次戳校验,如果校验通过,表示这期间没有其他线程的写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据一致性long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}
提供一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法:
读-读可以优化
读-写优化读,补加读锁
public static void main(String[] args) throws InterruptedException {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1000);
},"t1").start();
Thread.sleep(500);
new Thread(() -> {
dataContainer.write(1000);
},"t2").start();
}
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public int read(int readTime) throws InterruptedException {
long stamp = lock.tryOptimisticRead();
System.out.println(new Date() + " optimistic read locking" + stamp);
Thread.sleep(readTime);
// 戳有效,直接返回数据
if (lock.validate(stamp)) {
Sout(new Date() + " optimistic read finish..." + stamp);
return data;
}
// 说明其他线程更改了戳,需要锁升级了,从乐观读升级到读锁
System.out.println(new Date() + " updating to read lock" + stamp);
try {
stamp = lock.readLock();
System.out.println(new Date() + " read lock" + stamp);
Thread.sleep(readTime);
System.out.println(new Date() + " read finish..." + stamp);
return data;
} finally {
System.out.println(new Date() + " read unlock " + stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
System.out.println(new Date() + " write lock " + stamp);
try {
Thread.sleep(2000);
this.data = newData;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(new Date() + " write unlock " + stamp);
lock.unlockWrite(stamp);
}
}
}CountDown
基本使用
CountDownLatch:计数器,用来进行线程同步协作,等待所有线程完成
构造器:
public CountDownLatch(int count):初始化唤醒需要的 down 几步
常用API:
public void await():让当前线程等待,必须 down 完初始化的数字才可以被唤醒,否则进入无限等待public void countDown():计数器进行减 1(down 1)
应用:同步等待多个 Rest 远程调用结束
// LOL 10人进入游戏倒计时
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
String[] all = new String[10];
Random random = new Random();
for (int j = 0; j < 10; j++) {
int finalJ = j;//常量
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
Thread.sleep(random.nextInt(100)); //随机休眠
all[finalJ] = i + "%";
System.out.print("\r" + Arrays.toString(all)); // \r代表覆盖
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始");
service.shutdown();
}
/*
[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
游戏开始*/实现原理
阻塞等待:
- 线程调用 await() 等待其他线程完成任务:支持打断
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 判断线程是否被打断,抛出打断异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享锁,条件成立说明 state > 0,此时线程入队阻塞等待,等待其他线程获取共享资源
// 条件不成立说明 state = 0,此时不需要阻塞线程,直接结束函数调用
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}- 线程进入 AbstractQueuedSynchronizer#doAcquireSharedInterruptibly 函数阻塞挂起,等待 latch 变为 0:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 将调用latch.await()方法的线程 包装成 SHARED 类型的 node 加入到 AQS 的阻塞队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 前驱节点时头节点就可以尝试获取锁
if (p == head) {
// 再次尝试获取锁,获取成功返回 1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取锁成功,设置当前节点为 head 节点,并且向后传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 阻塞在这里
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 阻塞线程被中断后抛出异常,进入取消节点的逻辑
if (failed)
cancelAcquire(node);
}
}- 获取共享锁成功,进入唤醒阻塞队列中与头节点相连的 SHARED 模式的节点:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 将当前节点设置为新的 head 节点,前驱节点和持有线程置为 null
setHead(node);
// propagate = 1,条件一成立
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
// 获取当前节点的后继节点
Node s = node.next;
// 当前节点是尾节点时 next 为 null,或者后继节点是 SHARED 共享模式
if (s == null || s.isShared())
// 唤醒所有的等待共享锁的节点
doReleaseShared();
}
}计数减一:
- 线程进入 countDown() 完成计数器减一(释放锁)的操作
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 尝试释放共享锁
if (tryReleaseShared(arg)) {
// 释放锁成功开始唤醒阻塞节点
doReleaseShared();
return true;
}
return false;
}- 更新 state 值,每调用一次,state 值减一,当 state -1 正好为 0 时,返回 true
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
// 条件成立说明前面【已经有线程触发唤醒操作】了,这里返回 false
if (c == 0)
return false;
// 计数器减一
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 计数器为 0 时返回 true
return nextc == 0;
}
}- state = 0 时,当前线程需要执行唤醒阻塞节点的任务
private void doReleaseShared() {
for (;;) {
Node h = head;
// 判断队列是否是空队列
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点的状态为 signal,说明后继节点没有被唤醒过
if (ws == Node.SIGNAL) {
// cas 设置头节点的状态为 0,设置失败继续自旋
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果有其他线程已经设置了头节点的状态,重新设置为 PROPAGATE 传播属性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的head,
// 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
if (h == head)
break;
}
}CyclicBarrier
基本使用
CyclicBarrier:循环屏障,用来进行线程协作,等待线程满足某个计数,才能触发自己执行
常用方法:
public CyclicBarrier(int parties, Runnable barrierAction):用于在线程到达屏障 parties 时,执行 barrierActionparties:代表多少个线程到达屏障开始触发线程任务
barrierAction:线程任务
public int await():线程调用 await 方法通知 CyclicBarrier 本线程已经到达屏障
与 CountDownLatch 的区别:CyclicBarrier 是可以重用的
应用:可以实现多线程中,某个任务在等待其他线程执行完毕以后触发
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
System.out.println("task1 task2 finish...");
});
for (int i = 0; i < 3; i++) { // 循环重用
service.submit(() -> {
System.out.println("task1 begin...");
try {
Thread.sleep(1000);
barrier.await(); // 2 - 1 = 1
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() -> {
System.out.println("task2 begin...");
try {
Thread.sleep(2000);
barrier.await(); // 1 - 1 = 0
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}实现原理
成员属性
- 全局锁:利用可重入锁实现的工具类
// barrier 实现是依赖于Condition条件队列,condition 条件队列必须依赖lock才能使用
private final ReentrantLock lock = new ReentrantLock();
// 线程挂起实现使用的 condition 队列,当前代所有线程到位,这个条件队列内的线程才会被唤醒
private final Condition trip = lock.newCondition();- 线程数量:
private final int parties; // 代表多少个线程到达屏障开始触发线程任务
private int count; // 表示当前“代”还有多少个线程未到位,初始值为 parties当前代中最后一个线程到位后要执行的事件:
private final Runnable barrierCommand;代:
// 表示 barrier 对象当前 代
private Generation generation = new Generation();
private static class Generation {
// 表示当前“代”是否被打破,如果被打破再来到这一代的线程 就会直接抛出 BrokenException 异常
// 且在这一代挂起的线程都会被唤醒,然后抛出 BrokerException 异常。
boolean broken = false;
}- 构造方法:
public CyclicBarrie(int parties, Runnable barrierAction) {
// 因为小于等于 0 的 barrier 没有任何意义
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
// 可以为 null
this.barrierCommand = barrierAction;
}
成员方法
- await():阻塞等待所有线程到位
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// timed:表示当前调用await方法的线程是否指定了超时时长,如果 true 表示线程是响应超时的
// nanos:线程等待超时时长,单位是纳秒
private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取当前代
final Generation g = generation;
// 【如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常】
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程被中断了,则打破当前代,然后当前线程抛出中断异常
if (Thread.interrupted()) {
// 设置当前代的状态为 broken 状态,唤醒在 trip 条件队列内的线程
breakBarrier();
throw new InterruptedException();
}
// 逻辑到这说明,当前线程中断状态是 false, 当前代的 broken 为 false(未打破状态)
// 假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0
int index = --count;
// 条件成立说明当前线程是最后一个到达 barrier 的线程,【需要开启新代,唤醒阻塞线程】
if (index == 0) {
// 栅栏任务启动标记
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 启动触发的任务
command.run();
// run()未抛出异常的话,启动标记设置为 true
ranAction = true;
// 开启新的一代,这里会【唤醒所有的阻塞队列】
nextGeneration();
// 返回 0 因为当前线程是此代最后一个到达的线程,index == 0
return 0;
} finally {
// 如果 command.run() 执行抛出异常的话,会进入到这里
if (!ranAction)
breakBarrier();
}
}
// 自旋,一直到条件满足、当前代被打破、线程被中断,等待超时
for (;;) {
try {
// 根据是否需要超时等待选择阻塞方法
if (!timed)
// 当前线程释放掉 lock,【进入到 trip 条件队列的尾部挂起自己】,等待被唤醒
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 被中断后来到这里的逻辑
// 当前代没有变化并且没有被打破
if (g == generation && !g.broken) {
// 打破屏障
breakBarrier();
// node 节点在【条件队列】内收到中断信号时 会抛出中断异常
throw ie;
} else {
// 等待过程中代变化了,完成一次自我打断
Thread.currentThread().interrupt();
}
}
// 唤醒后的线程,【判断当前代已经被打破,线程唤醒后依次抛出 BrokenBarrier 异常】
if (g.broken)
throw new BrokenBarrierException();
// 当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑
if (g != generation)
return index;
// 当前线程 trip 中等待超时,然后主动转移到阻塞队列
if (timed && nanos <= 0L) {
breakBarrier();
// 抛出超时异常
throw new TimeoutException();
}
}
} finally {
// 解锁
lock.unlock();
}
}- breakBarrier():打破 Barrier 屏障
private void breakBarrier() {
// 将代中的 broken 设置为 true,表示这一代是被打破了,再来到这一代的线程,直接抛出异常
generation.broken = true;
// 重置 count 为 parties
count = parties;
// 将在trip条件队列内挂起的线程全部唤醒,唤醒后的线程会检查当前是否是打破的,然后抛出异常
trip.signalAll();
}- nextGeneration():开启新的下一代
private void nextGeneration() {
// 将在 trip 条件队列内挂起的线程全部唤醒
trip.signalAll();
// 重置 count 为 parties
count = parties;
// 开启新的一代,使用一个新的generation对象,表示新的一代,新的一代和上一代【没有任何关系】
generation = new Generation();
}Semaphore
基本使用
synchronized 可以起到锁的作用,但某个时间段内,只能有一个线程允许执行
Semaphore(信号量)用来限制能同时访问共享资源的线程上限,非重入锁
构造方法:
public Semaphore(int permits):permits 表示许可线程的数量(state)public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,下次执行的线程会是等待最久的线程
常用API:
public void acquire():表示获取许可public void release():表示释放许可,acquire() 和 release() 方法之间的代码为同步代码
public static void main(String[] args) {
// 1.创建Semaphore对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 3. 获取许可
semaphore.acquire();
sout(Thread.currentThread().getName() + " running...");
Thread.sleep(1000);
sout(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}实现原理
加锁流程:
- Semaphore 的 permits(state)为 3,这时 5 个线程来获取资源
Sync(int permits) {
setState(permits);
}假设其中 Thread-1,Thread-2,Thread-4 CAS 竞争成功,permits 变为 0,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取通行证,获取成功返回 >= 0的值
if (tryAcquireShared(arg) < 0)
// 获取许可证失败,进入阻塞
doAcquireSharedInterruptibly(arg);
}
// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取 state ,state 这里【表示通行证】
int available = getState();
// 计算当前线程获取通行证完成之后,通行证还剩余数量
int remaining = available - acquires;
// 如果许可已经用完, 返回负数, 表示获取失败,
if (remaining < 0 ||
// 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
compareAndSetState(available, remaining))
return remaining;
}
}
private void doAcquireSharedInterruptibly(int arg) {
// 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
final Node node = addWaiter(Node.SHARED);
// 获取标记
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 前驱节点是头节点可以再次获取许可
if (p == head) {
// 再次尝试获取许可,【返回剩余的许可证数量】
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功后本线程出队(AQS), 所在 Node设置为 head
// r 表示【可用资源数】, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 被打断后进入该逻辑
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置自己为 head 节点
setHead(node);
// propagate 表示有【共享资源】(例如共享读锁或信号量)
// head waitStatus == Node.SIGNAL 或 Node.PROPAGATE,doReleaseShared 函数中设置的
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点,做一次唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
- 这时 Thread-4 释放了 permits,状态如下
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
// 尝试释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前锁资源的可用许可证数量
int current = getState();
int next = current + releases;
// 索引越界判断
if (next < current)
throw new Error("Maximum permit count exceeded");
// 释放锁
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
// PROPAGATE 详解
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
}
- 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,并且 unpark 接下来的共享状态的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
PROPAGATE
假设存在某次循环中队列里排队的结点情况为 head(-1) → t1(-1) → t2(0),存在将要释放信号量的 T3 和 T4,释放顺序为先 T3 后 T4
// 老版本代码
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
// 有空闲资源
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
// 下一个
if (s == null || s.isShared())
unparkSuccessor(node);
}
}正常流程:
T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head.waitStatus 从 -1 变为 0
T1 由于 T3 释放信号量被唤醒,然后 T4 释放,唤醒 T2
BUG 流程:
T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head.waitStatus 从 -1 变为 0
T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,返回值为 0(获取锁成功,但没有剩余资源量)
T1 还没调用 setHeadAndPropagate 方法,T4 调用 releaseShared(1),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个 head),不满足条件,因此不调用 unparkSuccessor(head)
T1 获取信号量成功,调用 setHeadAndPropagate(t1.node, 0) 时,因为不满足 propagate > 0(剩余资源量 == 0),从而不会唤醒后继结点, T2 线程得不到唤醒
更新后流程:
T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head.waitStatus 从 -1 变为 0
T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,返回值为 0(获取锁成功,但没有剩余资源量)
T1 还没调用 setHeadAndPropagate 方法,T4 调用 releaseShared(),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个 head),调用 doReleaseShared() 将等待状态置为 PROPAGATE(-3)
T1 获取信号量成功,调用 setHeadAndPropagate 时,读到 h.waitStatus < 0,从而调用 doReleaseShared() 唤醒 T2
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置自己为 head 节点
setHead(node);
// propagate 表示有共享资源(例如共享读锁或信号量)
// head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点,做一次唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
// 唤醒
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 防止 unparkSuccessor 被多次执行
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}Exchanger
Exchanger:交换器,是一个用于线程间协作的工具类,用于进行线程间的数据交换
工作流程:两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据
常用方法:
public Exchanger():创建一个新的交换器public V exchange(V x):等待另一个线程到达此交换点public V exchange(V x, long timeout, TimeUnit unit):等待一定的时间
public class ExchangerDemo {
public static void main(String[] args) {
// 创建交换对象(信使)
Exchanger<String> exchanger = new Exchanger<>();
new ThreadA(exchanger).start();
new ThreadB(exchanger).start();
}
}
class ThreadA extends Thread{
private Exchanger<String> exchanger();
public ThreadA(Exchanger<String> exchanger){
this.exchanger = exchanger;
}
@Override
public void run() {
try{
sout("线程A,做好了礼物A,等待线程B送来的礼物B");
//如果等待了5s还没有交换就死亡(抛出异常)!
String s = exchanger.exchange("礼物A",5,TimeUnit.SECONDS);
sout("线程A收到线程B的礼物:" + s);
} catch (Exception e) {
System.out.println("线程A等待了5s,没有收到礼物,最终就执行结束了!");
}
}
}
class ThreadB extends Thread{
private Exchanger<String> exchanger;
public ThreadB(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
sout("线程B,做好了礼物B,等待线程A送来的礼物A.....");
// 开始交换礼物。参数是送给其他线程的礼物!
sout("线程B收到线程A的礼物:" + exchanger.exchange("礼物B"));
} catch (Exception e) {
e.printStackTrace();
}
}
}