avatar


8.多线程 [2/2]

这一章,我们会讨论

  1. Java内存模型
  2. 多线程特性
  3. ThreadLocal
  4. 原子类
  5. Lock类
  6. Volatile关键字
  7. 并发容器
  8. 线程池

在上一章《7.多线程 [1/2]》,我们讨论的内容有:

  1. 什么是多线程
  2. 线程创建
  3. 线程传递入参
  4. 线程控制
  5. 线程调度
  6. 线程生命周期
  7. 线程安全
  8. 线程死锁
  9. 线程通信

Java内存模型

在上一章《7.多线程 [1/2]》,我们通过CPU时间片理解了为什么会出现线程不安全,这一章,我们通过讨论Java内存模型,更深刻的理解为什么会线程不安全。

这部分的一些内容,其实在之前的章节中,讨论栈内存和堆内存的时候,都有过很通俗的解释。

Java程序执行流程回顾

早在第一章《1.基础语法》,我们就讨论过,我们写的Java代码,是首先编译成class字节码文件,然后交由JVM去执行的。

Java程序执行流程

如图所示:
首先Java源代码文件会被Java编译器编译为字节码文件。
然后由JVM中的类加载器加载各个类的字节码文件。
加载完毕之后,交由JVM执行引擎执行。
Java内存模型指的就是Runtime Data Area(运行时数据区),即程序执行期间用到的数据和相关信息保存区。

(关于类加载器,我们在《9.类的加载与反射》,有更详细的讨论。)

Java内存模型

根据JVM规范,JVM内存共分为虚拟机栈、堆、方法区、程序计数器、本地方法栈五个部分。结构如下图:

Java内存模型

我们解释一下。

  • 方法区和堆,颜色是一样的。都属于堆,方法区也是堆。但是方法区是堆里的永久区,垃圾回收的概率低,数据稳定。但是Heap堆会频繁的进行垃圾回收。
  • Runtime constant pool,是运行时常量池。是方法区里面的专门存放运行时常量的。
  • JVM Stacks,Java虚拟机栈。也有资料称之为:线程栈、Java栈。每一个线程在执行过程中都会在这个栈里面创建一个栈帧。栈由无数个栈帧构成。
  • 本地方法栈也是一个栈,但不同的是本地方法栈执行的是native方法,也就是Java最底层的方法,由其他语言实现的。而虚拟机栈里的是Java方法。
  • 程序计数器,是帮助执行栈帧里面方法的。

有几个再重点解释一下。

程序计数器

每个线程对应有一个程序计数器。
各线程的程序计数器是线程私有的,互不影响,是线程安全的。
程序计数器记录线程正在执行的内存地址,以便被中断线程恢复执行时再次按照中断时的指令地址继续执行
CPU可能执行到线程的一半,下次从中断的地方开始重新执行。程序技术器记录中断线程的位置。

Java虚拟机栈

每个线程会对应一个Java栈。
每个Java栈由若干栈帧组成。
每一个栈帧对应一个方法。
栈帧在方法运行时,创建并入栈;方法执行完,该栈帧弹出并返回。
栈顶的栈帧叫活动栈,表示当前执行的方法,只有栈顶的栈帧才可以被CPU执行。
线程请求的栈深度大于虚拟机所允许的深度,将抛出StackOverflowError异常。
栈的深度可以进行动态拓展,但是当扩展到无法申请到足够的内存,就会抛出OutOfMemoryError异常。

每一个栈帧的结构如下
栈帧

  • 栈帧1代表创建的线程的第一个方法,其他依次类推。
  • 每一个栈帧的结构都有:局部变量表、操作数栈、动态连接方法、返回地址
    • 局部变量指的是基本类型的。
    • 引用类型的话变量在栈帧结构里,但是具体的对象在堆里。

方法区MethodArea

方法区是Java堆的永久区(PermanetGeneration)
方法区存放了要加载的类的信息(名称、修饰符等)、类中的静态常量、类中定义为final类型的常量、类中的Field字段信息、类中的方法信息,
方法区是在堆中,方法区被Java线程共享的。
方法区要使用的内存超过其允许的大小时,会抛出OutOfMemoryError: PremGen space的错误信息。注意,会带提示信息:PremGen space。
一般是因为加载的类过多。

常量池ConstantPool

常量池是方法区的一部分。顾名思义,存储常量的。比如字符串,比如被final修饰的变量。

本地方法栈Native Method Stack

本地方法栈和Java栈所发挥的作用非常相似,区别不过是Java栈为JVM执行Java方法服务,而本地方法栈为JVM执行Native方法服务。
本地方法栈也会抛出StackOverflowError和OutOfMemoryError异常。

Java内存模型的工作

在讨论了Java内存模型之后,我们来讨论Java内存模型的工作。

Java内存模型工作示意图

  1. 首先类加载器将Java代码对应的Class文件加载到方法区。
  2. 方法区存储类的名称,关键字,方法,方法名,参数,返回值类型。
  3. 然后执行引擎从方法区找到main方法。
  4. 为main方法创建虚拟机栈,同时创建该main方法的程序计数器。
  5. 执行引擎请求CPU执行该方法。
  6. CPU将方法栈数据加载到工作内存(寄存器和高速缓存),执行该方法。
  7. CPU执行完之后将执行结果从工作内存同步到主内存。
  8. 例如我们的代码,创建了Object对象。那么这个对象同步到堆中。
  9. 假如我们再对这个Object对象进行更改,这个更改会在寄存器和高速缓存完成,再保存到堆当中。

在部分不考虑CPU的资料中,把Java栈称之为工作内存,把堆称之为主内存。我们这里把CPU考虑了,把继承器和高速换成称之为工作内存。

如果只有一个线程的话,上述过程没有一点问题。
但是如果是多线程,可能一个线程在堆里面,正在读。另一个线程在高速缓存里已经修改了,还没有保存到堆里。
这就造成了线程不安全。

要解决这些问题就涉及到多线程编程三个特性:原子性,有序性,可见性。

多线程特性

通过上文对"Java内存模型的工作"的讨论,我们更深刻的理解了线程不安全的原因。
如果要解决线程不安全,多线程编程要保证满足三个特性:

  1. 原子性
  2. 可见性
  3. 有序性

我们依次讨论。

原子性

这个名词的来源于,原子不可分。
其实根据现在的科学知识,我们直到原子是可分的。还有电子、质子和中子各种。但这个不重要。
我们就理解为原子是不可分。

原子性,即一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

可见性

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

有序性

有序性即程序执行的顺序按照我们的预期顺序执行。
在上一章《7.多线程 [1/2]》,我们利用同步关键字和锁等方法解决了。

线程不安全解决方法

为了保证多线程的三个特性,Java引入了很多线程控制机制,下面介绍其中常用的几种:

  1. ThreadLocal:为每一个线程保存线程本地变量。
  2. 原子类:特点保证变量的写操作是原子性。
  3. Lock类:保证线程的有序性,按照我们预期的顺序。
  4. Volatile关键字:保证线程变量的可见性。

ThreadLocal

应用

这种方法,其实就是我们上一章《7.多线程 [1/2]》通俗讨论的让线程不要共享变量。
ThreadLocal提供线程局部变量,即为使用相同变量的每一个线程维护一个该变量的副本。
当某些数据是以线程为作用域并且不同线程具有不同的数据副本的时候,就可以考虑采用ThreadLocal,比如数据库连接Connection,每个请求处理线程都需要,但又不相互影响,就可以用ThreadLocal实现。

ThreadLocal的常用方法有:

方法名 说明
initialValue 创建副本
get 获取副本
set 设置副本

我们举个例子,两个线程分别转账,各自往各自转账,这样即使两个线程并发,也没有任何线程不安全的问题。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.kakawanyifan;

public class Bank {
ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

public Integer add(Integer money) {
threadLocal.set(threadLocal.get() + money);
return threadLocal.get();
}

public Integer minus(Integer money) {
threadLocal.set(threadLocal.get() - money);
return threadLocal.get();
}

public Integer query() {
return threadLocal.get();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.kakawanyifan;

public class Transfer implements Runnable {
private Bank bank;

public Transfer(Bank bank) {
this.bank = bank;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
bank.add(10);
System.out.println(Thread.currentThread().getName() + " 余额:" + bank.query());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.kakawanyifan;

public class ThreadLocalDemo {

public static void main(String[] args) {
Bank bank = new Bank();
Transfer transfer = new Transfer(bank);

Thread t1 = new Thread(transfer, "一");
Thread t2 = new Thread(transfer, "二");

t1.start();
t2.start();
}

}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
一 余额:10
一 余额:20
二 余额:10
一 余额:30
二 余额:20
一 余额:40
一 余额:50
一 余额:60
二 余额:30
二 余额:40
一 余额:70
二 余额:50
二 余额:60
二 余额:70
二 余额:80
二 余额:90
一 余额:80
二 余额:100
一 余额:90
一 余额:100

源码分析

我们来看ThreadLocal的源码,主要看get方法和set方法的源码。

get方法
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

set方法
示例代码:

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

特别注意两行:

  1. Thread t = Thread.currentThread()
  2. ThreadLocalMap map = getMap(t)

这两行是关键,每一个线程绑定了一个线程专属的Map,在这个Map中保存值。从而确保两个线程并发,没有任何线程不安全的问题。

原子类

接下来,我们来讨论原子类。
在上一章我们讨论的线程安全的类《7.多线程 [1/2]》也具有和原子类比较类似的效果,但是实现原理不一样。
我们先来看应用。

应用

Java的java.util.concurrent.atomic包里面提供了很多可以进行原子操作的类,分为以下四类:

  • 原子基本类型:AtomicInteger、AtomicBoolean、AtomicLong等
  • 原子数组:AtomicIntegerArray、AtomicLongArray等
  • 原子引用类型:AtomicReference、AtomicStampedReference等
  • 原子属性类型:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater等

非原子性操作问题演示

非原子性的操作会引发什么问题呢?
下面以i++为例演示非原子性操作问题。
在之前的章节我们讨论过,i++实际上由三个操作构成。

  1. tp1 = i
  2. tp2 = tp1+1
  3. i = tp2

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.kakawanyifan;

public class AtomicClass {
static int n = 0;

public static void main(String[] args) throws InterruptedException {
int j = 0;
while (j < 10) {
n = 0;
Thread t1 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
n++;
}
}
};
Thread t2 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
n++;
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("n的最终值是:" + n);
j++;
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
n的最终值是:18
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20

解释说明:
两个线程并发的执行i++,在实验了10次后,发现n的最终值可能不是20

原子类解决非原子性操作问题

AtomicInteger的四个常用方法:

  • getAndIncrement:n++
  • incrementAndGet:++n
  • decrementAndGet:–n
  • getAndDecrement:n–

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.kakawanyifan;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicClass {
static AtomicInteger n;

public static void main(String[] args) throws InterruptedException {
int j = 0;
while (j < 1000) {
n = new AtomicInteger(0);
Thread t1 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
n.getAndIncrement();;
}
}
};
Thread t2 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
n.getAndIncrement();;
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("n的最终值是:" + n);
j++;
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20

【部分运行结果略】

n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20

解释说明:
我们实验了1000次,也没什么问题。执行结果如下:n的值永远是2000

源码分析:CAS

上文我们说原子类的实现原理和我们上一章《7.多线程 [1/2]》的线程安全的类不一样。
现在,我就来对原子类的源码进行分析。

我们以getAndIncrement为例。
示例代码:

1
2
3
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

再来看看getAndAddInt都做了什么。

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

解释一下:

  • this.getIntVolatile(var1, var2):var1是n的当前值,var2是地址偏移量。找到物理地址上寸的地址
  • compareAndSwapInt(var1, var2, var5, var5 + var4):这就是所谓的。

再来看看compareAndSwapInt都做了什么。

1
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5)

居然是一个native方法。
我们解释一下其大致过程。
首先:比对当前值(var1)和预期值(var5)是否相同。
如果相同:说明线程没有更改过该值,当前值(var1) = 期望值(var5) + 递增间隔(var4),返回true。不会进入循环。
如果不相同:说明有线程更改过该值,当前值(var1) = 期望值(var5),返回false。会进入循环。

这就是所谓的原子性的CAS,比较并交换。

ABA问题

CAS也有问题,这个问题我们称之为ABA问题。

现象

ABA问题比较难以代码进行复现,我们举例子说明。

假设张三同学的银行卡有100块钱余额,且假定银行转账操作就是一个单纯的CAS命令,对比余额旧值是否与当前值相同,如果相同则发生扣减/增加,我们将这个指令用CAS(origin,expect)表示。于是,我们看看接下来发生了什么:
张三在ATM-1转账100块钱给李四;
由于ATM-1出现了网络拥塞的原因卡住了,这时候张三跑到旁边的ATM-2再次操作转账;
ATM-2成功的执行了CAS(100,0),很痛快地完成了转账,此时张三的账户余额为0;
王五这时候又给张三账上转了100,此时张三账上余额为100;
这时候ATM-1网络恢复,继续执行CAS(100,0),居然执行成功了,张三账户上余额又变为了0;

出问题了!

再来捋一下这个过程。
这个故事有两条线。
第一条线:张三在ATM-1上转账100给李四,但是忽然网络卡住了。
第二条线:张三在ATM-2上转账100给李四,这时候余额从100变成了0,然后王五转账100给张三,余额再从0变成了100。即ABA
第一条线继续:ATM-1的操作继续,又转账成功了。

那么怎么办?
如果我们除了检查那100块钱还是不是之前100块钱,还检查一下钱的序列号?

凉粉

  • 我们给凉粉敲上一个序列号。

如果没有序列号呢?
那就敲个章,时间戳。

解决

在上文我们讨论的原子类的时候,提到了AtomicStampedReference,这个就可以用来解决ABA问题。
常用方法有

方法名 说明
getStamp 获取时间戳
getReference 获取预期值
compareAndSet(预期值,更新值,预期时间戳,更新时间戳) 实现CAS时间戳和预期值的比对

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.kakawanyifan;

import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicClass {
static AtomicStampedReference<Integer> n;
public static void main(String[] args) throws InterruptedException {
int j = 0;
while(j<1000){
n = new AtomicStampedReference<Integer>(0,0);
Thread t1 = new Thread(){
public void run(){
for(int i=0; i<10; i++){
int stamp;
Integer reference;
do{
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
stamp = n.getStamp();
reference = n.getReference();
} while(!n.compareAndSet(reference, reference+1, stamp, stamp+1));
}
}
};
Thread t2 = new Thread(){
public void run(){
for(int i=0; i<10; i++){
int stamp;
Integer reference;
do{
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
stamp = n.getStamp();
reference = n.getReference();
} while(!n.compareAndSet(reference, reference+1, stamp, stamp+1));
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("n的最终值是:"+n.getReference());
j++;
}

}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20

【部分运行结果略】

n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20
n的最终值是:20

Lock类

Lock接口关系

在上一章《7.多线程 [1/2]》,我们已经讨论过Lock类,作为一种解决线程安全问题的方法。而且,当时我们讨论了synchronized和Lock的区别,其中一个:synchronized是Java的关键字,在jvm层面上。Lock是一个类。
而且,当时我们说了,说Lock是一个类,不准确,在这一章会详细讨论,就在这里。
Lock接口关系

Lock接口关系图

Lock和ReadWriteLock是两大锁的根接口。
Lock接口支持重入、公平等的锁规则:实现类有ReentrantLock、ReadLock和WriteLock。
ReadWriteLock接口定义读取者共享而写入者独占的锁,实现类:ReentrantReadWriteLock。

特别注意!ReentrantReadWriteLock不是继承了ReadLock和WriteLock,而是包含了ReadLock和WriteLock。
我们来看源码。

1
2
3
4
5
6
7
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;

在上一章,讨论synchronized和Lock的区别时候,我们还说了:synchronized,可重入,不可中断,非公平。Lock:可重入,可中断,可公平(两者皆可),类型更丰富。
在这里,我们就对锁的类型进行讨论。

可重入锁

不可重入锁,即线程请求它已经拥有的锁时会阻塞。
可重入锁,即线程可以进入它已经拥有的锁的同步代码块。

我们来演示一下。
不可重入锁,我们自己实现一个。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.kakawanyifan;

public class Lock{
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException{
System.out.println("等待上锁");
while(isLocked){
wait();
}
isLocked = true;
System.out.println("上锁成功");
}
public synchronized void unlock(){
isLocked = false;
notify();
System.out.println("解锁成功");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.kakawanyifan;

public class LockDemo {

private static Lock lock = new Lock();
public static void main(String[] args) throws InterruptedException {
lock.lock();
doSomething();
lock.unlock();
}

public static void doSomething() throws InterruptedException {
lock.lock();
//do something
lock.unlock();
}
}

运行结果:

1
2
3
等待上锁
上锁成功
等待上锁

解释说明:
第二次上锁就一直处于等待,无法上锁成功。

再来看看可重入锁。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kakawanyifan;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest {
public static void main(String[] args) throws InterruptedException {

ReentrantLock lock = new ReentrantLock();

for (int i = 1; i <= 3; i++) {
lock.lock();
System.out.println("加锁 " + i);
}

for(int i=1;i<=3;i++){
try {
System.out.println("解锁 " + i);
} finally {
lock.unlock();
}
}
}
}

运行结果:

1
2
3
4
5
6
加锁 1
加锁 2
加锁 3
解锁 1
解锁 2
解锁 3

读写锁

读写锁:
可以同时读,读的时候不能写;
不能同时写,写的时候不能读。

这也是所谓的"悲观锁",读的时候不允许写,就是悲观锁,ReadWriteLock。

与之相对的还有"乐观锁",读的过程允许写入,这时候读的数据可能不一致,StampedLock。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.kakawanyifan;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
private Map<String, Object> map = new HashMap<String, Object>();
//创建一个读写锁实例
private ReadWriteLock rw = new ReentrantReadWriteLock();
//创建一个读锁
private Lock r = rw.readLock();
//创建一个写锁
private Lock w = rw.writeLock();

/**
* 读操作
*
* @param key
* @return
*/
public Object get(String key) {
r.lock();
System.out.println(Thread.currentThread().getName() + "读操作开始执行......");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
return map.get(key);
} finally {
r.unlock();
System.out.println(Thread.currentThread().getName() + "读操作执行完成......");
}
}

/**
* 写操作
*
* @param key
* @param value
*/
public void put(String key, Object value) {
try {
w.lock();
System.out.println(Thread.currentThread().getName() + "写操作开始执行......");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
} finally {
w.unlock();
System.out.println(Thread.currentThread().getName() + "写操作执行完成......");
}
}

public static void main(String[] args) {
final ReadWriteLockDemo d = new ReadWriteLockDemo();
d.put("key1", "value1");

new Thread(new Runnable() {
public void run() {
d.get("key1");
}
}).start();
new Thread(new Runnable() {
public void run() {
d.get("key1");
}
}).start();
new Thread(new Runnable() {
public void run() {
d.get("key1");
}
}).start();
}

}

运行结果:

1
2
3
4
5
6
7
8
main写操作开始执行......
main写操作执行完成......
Thread-0读操作开始执行......
Thread-1读操作开始执行......
Thread-2读操作开始执行......
Thread-0读操作执行完成......
Thread-2读操作执行完成......
Thread-1读操作执行完成......

公平锁

公平锁是Lock锁自带的功能,我们用Lock锁来演示。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.kakawanyifan;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SellTicket implements Runnable {
private int tickets = 100;
private Lock lock = new ReentrantLock(true);

@Override
public void run() {
while (true) {
try {
lock.lock();
if (tickets > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" + tickets + "张票");
tickets--;
}
} finally {
lock.unlock();
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
窗口1正在出售第100张票
窗口2正在出售第99张票
窗口3正在出售第98张票
窗口1正在出售第97张票
窗口2正在出售第96张票
窗口3正在出售第95张票
窗口1正在出售第94张票
窗口2正在出售第93张票
窗口3正在出售第92张票
窗口1正在出售第91张票
窗口2正在出售第90张票

【部分运行结果略】

窗口2正在出售第9张票
窗口3正在出售第8张票
窗口1正在出售第7张票
窗口2正在出售第6张票
窗口3正在出售第5张票
窗口1正在出售第4张票
窗口2正在出售第3张票
窗口3正在出售第2张票
窗口1正在出售第1张票

解释说明:
这时候不再是抢占了,所以公平的,大家排队,轮流。

Volatile关键字

概述

在上文还提到了,多线程的一个特性,可见性。Volatile关键字就是用来实现可见性的。

其实,除了可见性,这个关键字还有一个含义,不允许进行指令重排。

我们解释一下什么是指令重排。
比如

1
2
3
4
int i;
i = 1;
i = 2;
i = 3;

然后不会执行i=1和1=2。
但是如果进行volatile修饰了,会严格按照步骤执行。

应用

使用volatile必须满足以下两个条件:

  1. 对变量的写操作不依赖于当前值。
  2. 该变量没有包含在具有其他变量的不变式中。

因此,常见应用场景如下:

  1. 状态量标记
  2. 双重校验

状态量标记
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.kakawanyifan;

import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class VolatileDemo {
public static volatile boolean flag = true;
public static void main(String[] args) {
// 用于演示效果
new Thread(()->{
// 接收控制台参数
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String s = scanner.nextLine();
// 如果控制输入stop 将flag设置false, 所有任务都会执行完毕
if ("stop".equals(s)){
flag = false;
break;
}
}
}).start();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy ());
for (int i = 0; i < 20; i++) {
try {
// 循环执行 20个任务
executor.execute(new MyRunnable("第"+(i+1)+"号任务"));
} catch (Throwable e) {
e.printStackTrace();
System.out.println("丢弃任务: " + (i+1) );
}
}
}
static class MyRunnable implements Runnable{
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +"==>" +name);
while (flag){ // true
//flag是一个开关,为true时线程任务会一直执行让线程一直执行
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
pool-1-thread-1==>第1号任务
pool-1-thread-3==>第13号任务
pool-1-thread-2==>第2号任务
pool-1-thread-5==>第15号任务
pool-1-thread-4==>第14号任务
丢弃任务: 16
丢弃任务: 17
丢弃任务: 18
丢弃任务: 19
丢弃任务: 20
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@7b23ec81 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@3feba861 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@6f496d9f rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@10f87f48 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@2f4d3709 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
stop
pool-1-thread-1==>第4号任务
pool-1-thread-2==>第3号任务
pool-1-thread-2==>第8号任务
pool-1-thread-2==>第9号任务
pool-1-thread-2==>第10号任务
pool-1-thread-2==>第11号任务
pool-1-thread-2==>第12号任务
pool-1-thread-1==>第7号任务
pool-1-thread-5==>第5号任务
pool-1-thread-3==>第6号任务

这就是作为状态量标记的应用。

特别的,我们可以仔细看看运行结果。这个运行结果很有意思,我们会在本章讨论线程池的时候进行讨论。

双重校验
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Singleton{
    private volatile static Singleton instance = null;
 
    private Singleton() {
 
    }
 
    public static Singleton getInstance() {
        if(instance==null) {
            synchronized (Singleton.class) {
                if(instance==null)
                    instance = new Singleton();
            }
        }
        return instance;
    }
}

解释说明:
我们假如没有第二个if(instance==null)

1
2
3
4
5
6
7
8
public static Singleton getInstance() {
    if(instance==null) {
        synchronized (Singleton.class) {
                instance = new Singleton();
        }
    }
    return instance;
}

线程一进入到getInstance()这个方法,首先判断if(instance==null),不是空,所以进入synchronized (Singleton.class),上锁。然后这时候线程二也进入到了getInstance(),也判断if(instance==null),也进入到synchronized (Singleton.class),但这时候只能在外面等着。然后线程一继续,instance = new Singleton();,然后释放锁。线程二获得锁,然后又执行instance = new Singleton();。这时候就出问题了。

并发容器

常见的并发容器

在上一章我们讨论过线程安全的类,其特点是都用synchronized进行同步,也被称为同步容器。这样保证了线程的安全性,但代价就是严重降低了并发性能,当多个线程竞争容器时,吞吐量严重降低。
更好的方法是并发容器,在java.util.concurrent包中

并发容器如下:

  1. ConcurrentHashMap
    对应的非并发容器:HashMap
    目标:代替Hashtable、synchronizedMap。
  2. CopyOnWriteArrayList
    对应的非并发容器:ArrayList
    目标:代替Vector、synchronizedList
  3. CopyOnWriteArraySet
    对应的费并发容器:HashSet
    目标:代替synchronizedSet
  4. ConcurrentSkipListMap
    对应的非并发容器:TreeMap
    目标:代替synchronizedSortedMap(TreeMap)
  5. ConcurrentSkipListSet
    对应的非并发容器:TreeSet
    目标:代替synchronizedSortedSet
  6. ConcurrentLinkedQueue
    不会阻塞的队列
    对应的非并发容器:Queue
  7. LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue
    对应的非并发容器:BlockingQueue
    特点:拓展了Queue,增加了可阻塞的插入和获取等操作
    • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
    • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
    • PriorityBlockingQueue:按优先级排序的队列

ConcurrentHashMap源码分析

接下来以ConcurrentHashMap为例,讨论并发容器是怎么做到线程安全的。

首先我们要知道的是HashMap的数据结构,这我们在《算法入门经典(Java与Python描述):7.哈希表》有过讨论。
结构如下图:
结构如下图

在知道了结构之后,我们来看源码。
首先看put方法的源码。
示例代码:

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

调用了putVal,再来看看putVal的源码。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
  • 有两处需要特别注意。
    一处是casTabAt
    一处是synchronized(f)

casTabAt的作用是放数组中的元素,调用了我们之前讨论过的CAS方法。
示例代码:

1
2
3
4
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

synchronized(f),在放链表数据的时候,加锁,f是头节点。

有了这两个,就可以做到线程安全吗?
做不到。
我们在《算法入门经典(Java与Python描述):7.哈希表》讨论过,还有动态扩容、数据迁移和初始化等。
这些操作也要线程安全才行。

那么,这些线程是怎么做到线程安全的呢?
先看初始化。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 初始化的"功劳"被其他线程"抢去"了
if ((sc = sizeCtl) < 0)
//放弃执行权
Thread.yield();
// CAS一下,将 sizeCtl 设置为 -1,代表抢到了锁
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 初始化
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

注意看,if ((sc = sizeCtl) < 0),把sizeCtl赋值给sc,然后如果sc小于0,就放弃CPU的执行权。否则的话呢,调用if (U.compareAndSwapInt(this, SIZECTL, sc, -1))方法,给sizeCtl赋值-1,锁住。利用这种方法做到线程安全。

再来看动态扩容。
动态扩容方法是treeifyBin(),然后这个方法再调用了tryPresize。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {

【部分代码略】

}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {

【部分代码略】

if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

逻辑类似,注意两行:
while ((sc = sizeCtl) >= 0)
if (U.compareAndSwapInt(this, SIZECTL, sc, -1))

数据迁移方法是transfer,逻辑也是类似的。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

线程池

接下来,我们讨论线程池。在上一章《7.多线程 [1/2]》,在讨论创建线程的方法的时候,有一种方法就是线程池。
这一章,我们会详细的讨论线程池。

多线程的缺点

多线程有什么缺点?
在上一章《7.多线程 [1/2]》,讨论线程调度的时候,我们讲过,CPU有一个一个细小的时间片,线程需要去抢占时间片,也就是说CPU需要在多线程之间不断的切换。这个非常消耗资源。
除此之外,还有一个,也是在上一章《7.多线程 [1/2]》,我们讨论过线程的生命周期。一共有新建、就绪、运行、阻塞和销毁几个状态,其中新建和销毁也非常消耗资源。

到这里,就可以解释我们在之前章节说的多线程不一定快了,原因就是这两点。

  1. 线程创建和销毁都非常耗时并消耗资源。
  2. 线程之间的切换也会非常耗时并消耗资源。

线程池介绍

正因为上述缺点,所以有了线程池这种东西。

可以通过有限的几个固定线程为大量的操作服务,减少了创建和销毁线程所需的时间,从而提高效率。

在开发过程中,合理地使用线程池能够带来3个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程数量选择

注意,在上文我们一直说合理的使用线程,合理的使用线程。
因为多线程只是节约了创建和销毁的资源,并没有节约CPU在多个线程之间不断的切换的时间。
那么,怎么线程应该设置多大合适呢?

CPU密集型运算

假如是CPU密集型运算
经验公式如下:

线程数=CPU核数+1\text{线程数} = \text{CPU核数} + 1

这样能够实现最优的CPU利用率。
+1+1是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证CPU时间片不被浪费

I/O密集型运算

假如是I/O密集型运算
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时、包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下

线程数=核数期望CPU利用率总时间CPU计算时间\text{线程数} = \text{核数} * \text{期望CPU利用率} * \frac{\text{总时间}}{\text{CPU计算时间}}

  • 总时间=CPU计算时间+等待时间\text{总时间} = \text{CPU计算时间} + \text{等待时间}

例如4核CPU计算时间是50%,其它等待时间是50%,期望CPU被100%利用,则有

4100%100%50%=84 * 100\% * \frac{100\%}{50\%} = 8

例如4核CPU计算时间是10%,其它等待时间是90%,期望CPU被100%利用,套用公式

4100%100%10%=404 * 100\% * \frac{100\%}{10\%} = 40

线程池的工作过程

在上文讨论volatile关键字的时候,我们举了一个用作状态量标记的例子,并且我们说运行结果很有意思,会在本章讨论线程池的时候进行讨论。就在这里。

为了方便大家看,我们把上述的代码搬过来。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.kakawanyifan;

import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class VolatileDemo {
public static volatile boolean flag = true;
public static void main(String[] args) {
// 用于演示效果
new Thread(()->{
// 接收控制台参数
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String s = scanner.nextLine();
// 如果控制输入stop 将flag设置false, 所有任务都会执行完毕
if ("stop".equals(s)){
flag = false;
break;
}
}
}).start();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy ());
for (int i = 0; i < 20; i++) {
try {
// 循环执行 20个任务
executor.execute(new MyRunnable("第"+(i+1)+"号任务"));
} catch (Throwable e) {
e.printStackTrace();
System.out.println("丢弃任务: " + (i+1) );
}
}
}
static class MyRunnable implements Runnable{
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +"==>" +name);
while (flag){ // true
//flag是一个开关,为true时线程任务会一直执行让线程一直执行
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
pool-1-thread-1==>第1号任务
pool-1-thread-2==>第2号任务
pool-1-thread-3==>第13号任务
pool-1-thread-4==>第14号任务
pool-1-thread-5==>第15号任务
丢弃任务: 16
丢弃任务: 17
丢弃任务: 18
丢弃任务: 19
丢弃任务: 20
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@7b23ec81 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@3feba861 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@6f496d9f rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@10f87f48 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
java.util.concurrent.RejectedExecutionException: Task com.kakawanyifan.VolatileDemo$MyRunnable@2f4d3709 rejected from java.util.concurrent.ThreadPoolExecutor@6acbcfc0[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.kakawanyifan.VolatileDemo.main(VolatileDemo.java:37)
stop
pool-1-thread-1==>第4号任务
pool-1-thread-2==>第3号任务
pool-1-thread-2==>第8号任务
pool-1-thread-2==>第9号任务
pool-1-thread-2==>第10号任务
pool-1-thread-2==>第11号任务
pool-1-thread-2==>第12号任务
pool-1-thread-1==>第7号任务
pool-1-thread-5==>第5号任务
pool-1-thread-3==>第6号任务

线程池的工作原理

线程池在刚创建的时候,里面一个线程也没有。

现在要执行第1号任务了,线程池里没有线程,那谁来执行?
这就是第一个判断。
第一个判断:判断核心线程数
判断正在运行的工作线程是否小于设置的核心线程数,小于尝试创建一个新的工作线程,如果不小于进入下一个判断。
所以,pool-1-thread-1==>第1号任务,第一个线程执行第1号任务。

然后我们执行第2号任务,同样进入第一个判断,所以,pool-1-thread-2==>第2号任务,第二个线程执行第2号任务。

再执行第3号任务,判断正在运行的工作线程是否小于设置的核心线程数,不小于。所以,进入下一个判断。
第二个判断:判断任务队列
判断当前线程池的任务队列是否已满,未满的话将任务加入任务队列,如果满了,进入下一个判断。
所以,第3号任务进入队列。
我们的队列大小是10,所以:3、4、5、6、7、8、9、10、11、12都会进入队列。

再执行第13号任务,判断当前线程池的任务队列是否已满,满了。所以,进入下一个判断。
第三个判断:判断最大线程数
判断当前线程池的工作线程是否小于设置的最大线程数,小于尝试创建一个新的临时工作线程,如果不小于进入下一判断。
所以,pool-1-thread-3==>第13号任务,第三个线程执行第13号任务。
同理,pool-1-thread-4==>第14号任务,第四个线程执行第14号任务;pool-1-thread-5==>第15号任务,第五个线程执行第15号任务。

再执行第16号任务,再创建临时线程。不,最大线程数是5,核心线程是2,临时线程是3。所以,进入下一个判断。
第四个判断:判断饱和拒绝策略
到此流程,说明当前线程池已经饱和,需要进行饱和拒绝策略,根据设置的策略进行处理。
这里我们的拒绝策略是什么?new ThreadPoolExecutor.AbortPolicy (),丢弃后续的任务,并抛出异常。
所以,丢弃任务: 16丢弃任务: 17丢弃任务: 18丢弃任务: 19丢弃任务: 20

然后我们在控制台输入了stop,所以while循环不再是死循环了。
所以对列里剩余的10个任务也执行了。

线程池中线程的销毁

通过上述的讨论,我们已经知道了线程池的工作过程。
那么,什么时候线程池会被销毁呢?
在上一章《7.多线程 [1/2]》,我们讨论过。

  1. executorService.shutdown();
    等正在进行任务执行完,进行停止。线程池会在workQueue中的任务执行完毕后销毁所有线程,关闭线程池。
  2. executorService.shutdownNow();
    不用等待正在进行任务执行完,立即停止。workQueue中的未完成任务 会作为返回值返回。

还有吗?
如果是临时线程,而且如果空闲时间达到keepAlivedTime的TimeUnit的值后,会被销毁。
那么,核心线程呢?默认情况下,核心线程永远不会被回收,但是我们可以通过allowCoreThreadTimeOut,设置核心线程的超时时间。

示例代码:

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy ());
executor.allowCoreThreadTimeOut(true);
  • 注意:executor.allowCoreThreadTimeOut(true),如果10秒内都没有新的任务的话,都会被销毁。

线程池的饱和拒绝策略

在上文的例子中,如果线程池中工作线程数量已经达到最大线程,并且任务队列已满。这时候就会根据我们设置的饱和拒绝策略进行处理。
ThreadPoolExecutor中 内置了4种拒绝策略:

  1. CallerRunsPolicy: 不丢弃任务,让调用线程池的线程帮忙执行任务。
  2. AbortPolicy: 丢弃后续的任务,并抛出异常【默认的是这种】。
  3. DiscardOldestPolicy: 丢弃任务队列中 存放最久的任务,不抛异常。
  4. DiscardPolicy: 丢弃后续任务,不抛异常。

特别的,我们可以看看这四种策略的源代码。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class CallerRunsPolicy implements RejectedExecutionHandler {

【部分代码略】

}


public static class AbortPolicy implements RejectedExecutionHandler {

【部分代码略】

}


public static class DiscardPolicy implements RejectedExecutionHandler {

【部分代码略】

}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

【部分代码略】

}

都实现了RejectedExecutionHandler这个接口。

这里比较难理解的是,CallerRunsPolicy,不丢弃任务,让调用线程池的线程帮忙执行任务。
我们举例子来说明。

示例代码:

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy ());
executor.allowCoreThreadTimeOut(true);

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pool-1-thread-1==>第1号任务
pool-1-thread-3==>第13号任务
pool-1-thread-2==>第2号任务
pool-1-thread-4==>第14号任务
main==>第16号任务
pool-1-thread-5==>第15号任务
stop
main==>第17号任务
pool-1-thread-3==>第4号任务
pool-1-thread-4==>第3号任务
pool-1-thread-2==>第8号任务
pool-1-thread-3==>第7号任务
pool-1-thread-5==>第6号任务
pool-1-thread-1==>第5号任务
pool-1-thread-5==>第12号任务
pool-1-thread-3==>第11号任务
pool-1-thread-2==>第10号任务
pool-1-thread-4==>第9号任务
pool-1-thread-3==>第20号任务
pool-1-thread-5==>第19号任务
pool-1-thread-1==>第18号任务

线程池的核心参数

在上文我们已经用了ThreadPoolExecutor这个类,现在,我们回过头,再来系统的讨论这个类。

核心构造器参数

组件 含义
int corePoolSize 核心线程池的大小
int maximumPoolSize 最大线程池的大小
BlockingQueue workQueue 用来暂时保存任务的工作队列
RejectedExecutionHandler 当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池的大小且工作队列已满),execute()方法将要调用的Handler
long keepAliveTime, 表示空闲线程的存活时间。
TimeUnit 表示keepAliveTime的单位。
ThreadFactory threadFactory 指定创建线程的线程工厂

线程池的三种队列

上述参数其实我们刚刚在讨论都讨论过了,除了队列。
线程池中一共有三种队列

  1. SynchronousQueue
    SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
    使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。
  2. LinkedBlockingQueue
    LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
  3. ArrayBlockingQueue
    ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。

线程池工具类

在上一章《7.多线程 [1/2]》,讨论线程创建的时候,其实我们用的是线程池工具类,Executors。现在我们讨论一下。

Executors的四种线程

为了方便的创建线程池,Java中又定义了Executors类,Eexcutors类提供了四个创建线程池的方法,分别如下

  1. newCachedThreadPool
    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool
    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newSingleThreadExecutor
    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  4. newScheduledThreadPool
    创建一个定长线程池,支持延时及周期性任务执行。

特别注意!
阿里巴巴的开发规范,不建议我们使用快捷创建线程池的方法,因为这个参数不一定是最适合的。

newCachedThreadPool

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.kakawanyifan;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolDemo {
public static void main(String[] args) {
// 创建线程池 可缓存的线程池
ExecutorService es = Executors.newCachedThreadPool();
// 会创建出10个线程 分别执行任务
for (int i = 0; i < 10; i++) {
es.execute(()->{
for (int j = 0; j < 10; j++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + j);
}
});
}
es.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
pool-1-thread-1:0
pool-1-thread-6:0
pool-1-thread-10:0

【部分运行结果略】

pool-1-thread-4:9
pool-1-thread-8:9
pool-1-thread-1:9

我们来看看源代码newCachedThreadPool的源代码。
示例代码:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

解释说明:
核心线程数是0、最大线程是Integer.MAX_VALUE,60秒销毁,而且队列中不能存任务。

newFixedThreadPool

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.kakawanyifan;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
// 会创建出10个线程 分别执行任务
for (int i = 0; i < 10; i++) {
es.execute(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 10; j++) {
System.out.println(Thread.currentThread().getName() + ":" + j);
}
});
}
es.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
pool-1-thread-2:0
pool-1-thread-2:1
pool-1-thread-2:2

【部分运行结果略】

pool-1-thread-2:9
pool-1-thread-1:8
pool-1-thread-1:9

我们来看看newFixedThreadPool的源代码。
示例代码:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

解释说明:
没有临时线程,无界队列,这个队列没有界限。

newSingleThreadExecutor

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.kakawanyifan;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorDemo {
public static void main(String[] args) {
// 创建线程池
ExecutorService es = Executors.newSingleThreadExecutor();
// 会创建出10个线程 分别执行任务
for (int i = 0; i < 10; i++) {
es.execute(()->{
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 10; j++) {
System.out.println(Thread.currentThread().getName() + ":" + j);
}
});
}
es.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2

【部分运行结果略】

pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9

我们来看看newSingleThreadExecutor的源代码。
示例代码:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

解释说明:
只有一个线程线程,没有临时线程,无界队列。

newScheduleThreadPool

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.kakawanyifan;

import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);

// 延迟执行任务
ScheduledFuture<?> schedule = newScheduledThreadPool.schedule(() -> {
System.out.println(Instant.now() + " 延迟执行任务");
}, 3, TimeUnit.SECONDS);


// 周期性执行任务
ScheduledFuture<?> scheduledFuture = newScheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println(Instant.now() + " 周期性执行任务");
}, 0, 3, TimeUnit.SECONDS);

Thread.sleep(10000);

// 取消
scheduledFuture.cancel(false);

}
}

运行结果:

1
2
3
4
5
2021-09-16T14:32:08.253Z 周期性执行任务
2021-09-16T14:32:11.252Z 延迟执行任务
2021-09-16T14:32:11.252Z 周期性执行任务
2021-09-16T14:32:14.252Z 周期性执行任务
2021-09-16T14:32:17.255Z 周期性执行任务
  • scheduleAtFixedRate,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
  • scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。

线程池中线程的复用

最后一个话题,线程池中的线程是如何实现复用的?
要回答这个问题,就不得不对java.util.concurrent.ThreadPoolExecutor进行更深入的分析。

创建线程

我们从这段代码开始。
示例代码:

1
2
3
4
/**
* Set containing all worker threads in pool. Accessed only when holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

所有的"线程"都被放在了这个HashSet中。

再来看看Worker内部的结构怎么用的。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
【部分代码略】

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;

【部分代码略】

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

【部分代码略】

}

Worker实现了Runnable接口,还有两个成员变量,一个是Thread thread,一个是Runnable firstTask
还有一个构造方法,仔细看这个构造方法。
this.firstTask = firstTask:传入worker第一次要执行的任务。
this.thread = getThreadFactory().newThread(this):使用工厂对象创建线程, 并把worker本身传入。

所以,这是第一个任务来了,就创建线程。

run方法

那么,线程是怎么执行的呢?
实现的是Runnable接口,所以必定在run方法中。
示例代码:

1
2
3
public void run() {
runWorker(this);
}

看看runWorker都做了啥。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

注意,这里就有文章了。有一个while循环,只要还有任务,while循环就不会停止。
而且我们看到有这么一段task = getTask(),获取任务?
看看获取任务的方法。

获取任务

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

用for的死循环来获取任务?然后有一个timeOut。
那么死循环怎么退出?直接return,前几种都是return null,return null的话,那么while循环就会退出。
注意最后几行,一定要r不等于null,然后return r。
r从哪来?
示例代码:

1
2
3
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

timed怎来的?
示例代码:

1
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

如果允许核心线程TimeOut或者线程数大于了核心线程数,timed即使true,就会通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法获取r。
即如果是临时线程,用poll()方法,在指定的时间没有获取到任务,得到的就是null。如果是核心线程,用的是take,这个是阻塞队列里的方法,会一直阻塞到这里。

小结

每一个线程都是一个worker对象,当把工作者启动之后,工作者会执行一个while循环,不断的调用getTask()方法,去任务队列workQueue里获取任务,只要能得到任务,getTask()就会有返回值,while循环就会不断运行。如果是临时线程,用poll()方法,在指定的时间没有获取到任务,得到的就是null。如果是核心线程,用的是take,这个是阻塞队列里的方法,会一直阻塞到这里。特别的,如果核心线程设置了超时时间,也是poll。

文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/10808
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板