相关概念

进程:程序是静止的,进程实体的运行过程就是进程,是系统进行资源分配的基本单位

进程的特征:并发性、异步性、动态性、独立性、结构性

线程:线程是属于进程的,是一个基本的 CPU 执行单元,是程序执行流的最小单元。线程是进程中的一个实体,是系统独立调度的基本单位,线程本身不拥有系统资源,只拥有一点在运行中必不可少的资源,与同属一个进程的其他线程共享进程所拥有的全部资源

关系:一个进程可以包含多个线程,这就是多线程,比如看视频是进程,图画、声音、广告等就是多个线程

线程的作用:使多道程序更好的并发执行,提高资源利用率和系统吞吐量,增强操作系统的并发性能

并发并行:

  • 并行:在同一时刻,有多个指令在多个 CPU 上同时执行
  • 并发:在同一时刻,有多个指令在单个 CPU 上交替执行

同步异步:

  • 需要等待结果返回,才能继续运行就是同步
  • 不需要等待结果返回,就能继续运行就是异步

线程进程对比:

  • 进程基本上相互独立的,而线程存在于进程内,是进程的一个子集

  • 进程拥有共享的资源,如内存空间等,供其内部的线程共享

  • 进程间通信较为复杂

    同一台计算机的进程通信称为 IPC(Inter-process communication)

    • 信号量:信号量是一个计数器,用于多进程对共享数据的访问,解决同步相关的问题并避免竞争条件

    • 共享存储:多个进程可以访问同一块内存空间,需要使用信号量用来同步对共享存储的访问

    • 管道通信:管道是用于连接一个读进程和一个写进程以实现它们之间通信的一个共享文件 pipe 文件,该文件同一时间只允许一个进程访问,所以只支持

      半双工通信

      • 匿名管道(Pipes):用于具有亲缘关系的父子进程间或者兄弟进程之间的通信
      • 命名管道(Names Pipes):以磁盘文件的方式存在,可以实现本机任意两个进程通信,遵循 FIFO
    • 消息队列:内核中存储消息的链表,由消息队列标识符标识,能在不同进程之间提供

      全双工通信

      ,对比管道:

      • 匿名管道存在于内存中的文件;命名管道存在于实际的磁盘介质或者文件系统;消息队列存放在内核中,只有在内核重启(操作系统重启)或者显示地删除一个消息队列时,该消息队列才被真正删除
      • 读进程可以根据消息类型有选择地接收消息,而不像 FIFO 那样只能默认地接收

    不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP

    • 套接字:与其它通信机制不同的是,可用于不同机器间的互相通信
  • 线程通信相对简单,因为线程之间共享进程内的内存,一个例子是多个线程可以访问同一个共享变量

    Java 中的通信机制:volatile、等待/通知机制、join 方式、InheritableThreadLocal、MappedByteBuffer

  • 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低


Java线程

实现多线程的方式

1.继承Thread类,重写该类的run()方法

Thread 构造器:

  • public Thread()
  • public Thread(String name)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ThreadDemo {
public static void main(String[] args) {
Thread t = new MyThread();
t.start();
for(int i = 0 ; i < 100 ; i++ ){
System.out.println("main线程" + i)
}
// main线程输出放在上面 就变成有先后顺序了,因为是 main 线程驱动的子线程运行
}
}
class MyThread extends Thread {
@Override
public void run() {
for(int i = 0 ; i < 100 ; i++ ) {
System.out.println("子线程输出:"+i)
}
}
}
2.实现Runnable接口,并重写该接口的run()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadDemo {
public static void main(String[] args) {
Runnable target = new MyRunnable();
Thread t1 = new Thread(target,"1号线程");
t1.start();
Thread t2 = new Thread(target);//Thread-0
}
}

public class MyRunnable implements Runnable{
@Override
public void run() {
for(int i = 0 ; i < 10 ; i++ ){
System.out.println(Thread.currentThread().getName() + "->" + i);
}
}
}
3.实现 Callable 接口:接口创建线程。
  1. 定义一个线程任务类实现 Callable 接口,申明线程执行的结果类型
  2. 重写线程任务类的 call 方法,这个方法可以直接返回执行的结果
  3. 创建一个 Callable 的线程任务对象
  4. 把 Callable 的线程任务对象包装成一个未来任务对象
  5. 把未来任务对象包装成线程对象
  6. 调用线程的 start() 方法启动线程

public FutureTask(Callable<V> callable):未来任务对象,在线程执行完后得到线程的执行结果

  • FutureTask 就是 Runnable 对象,因为 Thread 类只能执行 Runnable 实例的任务对象,所以把 Callable 包装成未来任务对象
  • 线程池部分详解了 FutureTask 的源码

public V get():同步等待 task 执行完毕的结果,如果在线程中获取另一个线程执行结果,会阻塞等待,用于线程同步

  • get() 线程会阻塞等待任务执行完成
  • run() 执行完后会把结果设置到 FutureTask 的一个成员变量,get() 线程可以获取到该变量的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyThread implements Callable<String> {
private int count = 20;

@Override
public String call() throws Exception {
for (int i = count; i > 0; i--) {
// Thread.yield();
System.out.println(Thread.currentThread().getName()+"当前票数:" + i);
}
return "sale out";
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
Callable<String> callable =new MyThread();
FutureTask <String>futureTask=new FutureTask<>(callable);
Thread mThread=new Thread(futureTask);
Thread mThread2=new Thread(futureTask);
Thread mThread3=new Thread(futureTask);
// mThread.setName("hhh");
mThread.start();
mThread2.start();
mThread3.start();
System.out.println(futureTask.get());

}
}
4.通过线程池启动多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
public static void main(String[] args) {
ExecutorService ex=Executors.newFixedThreadPool(5);

for(int i=0;i<5;i++) {
ex.submit(new Runnable() {

@Override
public void run() {
for(int j=0;j<10;j++) {
System.out.println(Thread.currentThread().getName()+j);
}

}
});
}
ex.shutdown();
}
}

常用方法

start和run

  • 直接调用 run 是在主线程中执行了 run,没有启动新的线程
  • 使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码

sleep 与 yield

  • sleep:

    • 调用 sleep 会让当前线程从 Running进入 Timed Waiting 状态(阻塞)
    • 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
    • 睡眠结束后的线程未必会立刻得到执行
    • 建议用 TimeUnit(时间单位) 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
  • yield

    • 调用 yield 会让当前线程从 Running 进入 Runnable就绪状态,然后调度执行其它线程
    • 具体的实现依赖于操作系统的任务调度器
  • 线程优先级

    • 线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它
    • 如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用

在没有利用 cpu 来计算时,不要让 while(true) 空转浪费 cpu,这时可以使用 yield 或 sleep 来让出 cpu 的使用权 给其他程序

1
2
3
4
5
6
7
while(true) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
  • join
    • 等待线程运行结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int r = 0;
public static void main(String[] args) throws InterruptedException {
test1();
}

private static void test1() throws InterruptedException {
log.debug("开始");
Thread t1 = new Thread(() -> {
log.debug("开始");
sleep(1);
log.debug("结束");
r = 10;
});
t1.start();
t1.join();
log.debug("结果为:{}", r);
log.debug("结束");
}

需要等待结果返回,才能继续运行就是同步

不需要等待结果返回,就能继续运行就是异步

  • interrupt

  • 打断 sleep,wait,join 的线程

    • 这几个方法都会让线程进入阻塞状态
  • 打断 sleep 的线程, 会清空打断状态,以 sleep 为例

    • 打断状态:描述一个线程是否被打断的布尔值
  • 打断正常运行的线程不会清空打断状态

  • 两阶段终止模式

在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。

1
2
3
4
5
6
7
8
9
graph TD
W("while(true)") --> a
a("有没有被打断?") --是--> b(料理后事)
b --> c((结束循环))
a--否 --> d(睡眠2s)
d --无异常--> e(执行监控记录)
d --有异常--> i(设置打断标记)
i-->W
e-->W
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TPTInterrupt {
private Thread thread;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(current.isInterrupted()) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
current.interrupt();
}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
thread.interrupt();
}
}
1
2
3
4
5
6
TPTInterrupt t = new TPTInterrupt();
t.start();

Thread.sleep(3500);
log.debug("stop");
t.stop();

守护线程

  • 默认情况下,Java 进程需要等待所有线程都运行结束,才会结束。
  • 有一种特殊的线程叫做守护线程,只要其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
1
2
3
4
5
6
7
8
9
10
11
12
log.debug("开始运行...");
Thread t1 = new Thread(() -> {
log.debug("开始运行...");
sleep(2);
log.debug("运行结束...");
}, "daemon");
// 设置该线程为守护线程
t1.setDaemon(true);
t1.start();

sleep(1);
log.debug("运行结束...");

五种状态

  • 【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联

  • 【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行

  • 【运行状态】指获取了 CPU 时间片运行中的状态

    • 当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
  • 【阻塞状态】

    • 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】
    • 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
    • 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
  • 【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

synchronized

synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。

加在对象上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Room {
int value = 0;
public void increment() {
synchronized (this) {
value++;
}
}
public void decrement() {
synchronized (this) {
value--;
}
}
public int get() {
synchronized (this) {
return value;
}
}
}

@Slf4j
public class Test1 {

public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int j = 0; j < 5000; j++) {
room.increment();
}
}, "t1");

Thread t2 = new Thread(() -> {
for (int j = 0; j < 5000; j++) {
room.decrement();
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("count: {}" , room.get());
}
}

加在方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Test{
public synchronized void test() {

}
}
等价于
class Test{
public void test() {
synchronized(this) {

}
}
}
  • 锁住了调用这个方法的对象

加在静态方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Test{
public synchronized static void test() {

}
}
等价于
class Test{
public static void test() {
synchronized(Test.class) {

}
}
}
  • 锁住了当前类

共享模型之管程

管程可以看做⼀个软件模块,它是将共享的变量和对于这些共享变量的操作封装起来,形成⼀个具有⼀定接口的功能模块,进程可以调⽤管程来实现进程级别的并发控制。

变量的线程安全分析

成员变量和静态变量是否线程安全?

  • 如果它们没有共享,则线程安全

  • 如果它们被共享了,根据它们的状态是否能够改变,又分两种情况

    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全

局部变量是否线程安全?

  • 局部变量是线程安全的

  • 但局部变量引用的对象则未必

    • 如果该对象没有逃离方法的作用访问,它是线程安全的
    • 如果该对象逃离方法的作用范围,需要考虑线程安全

常见线程安全类

  • String
  • Integer
  • StringBuffffer
  • Random
  • Vector
  • Hashtable
  • java.util.concurrent 包下的类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。

但注意它们多个方法的组合不是原子的

1
2
3
4
5
6
7
8
sequenceDiagram
participant t1 as 线程1
participant t2 as 线程2
participant table
t1 ->> table : get("key")==nu11
t2 ->> table : get("key")==nu1l
t2 ->> table : put("key", v2)
t1 ->> table : put("key", v1)
  • String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的

原理之 Monitor(锁)

  • 刚开始 Monitor 中 Owner 为 null
  • 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner
  • 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj),就会进入EntryList(阻塞队列) BLOCKED
  • Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争的时是非公平的
  • 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析

synchronized 必须是进入同一个对象的 monitor 才有上述的效果

不加 synchronized 的对象不会关联监视器,不遵从以上规则

Monitor是操作系统C++实现的

轻量级锁

使用场景:如果一个对象虽然有多线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。

  1. 创建 锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word

  1. 让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录
  2. 如果 cas 替换成功,对象头中存储了 锁记录地址和状态 00 ,表示由该线程给对象加锁,这时图示如下

  • 如果 cas 失败,有两种情况

    • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数
  • 当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头

    • 成功,则解锁成功
    • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
重量级锁(膨胀)

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁这时 Thread-1 加轻量级锁失败,进入锁膨胀流程

  • 即为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
    然后自己进入 Monitor 的 EntryList BLOCKED

自旋优化

重量级锁竞争的时候,还可以使用自旋(循环尝试获取重量级锁)来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。 (进入阻塞再恢复,会发生上下文切换,比较耗费性能)

偏向锁

Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有

锁消除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

@Fork(1)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations=3)
@Measurement(iterations=5)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MyBenchmark {
static int x = 0;
@Benchmark
public void a() throws Exception {
x++;
}
@Benchmark
public void b() throws Exception {
//这里的o是局部变量,不会被共享,JIT做热点代码优化时会做锁消除
Object o = new Object();
synchronized (o) {
x++;
}
}
}

总结:synchronized默认是使用轻量级锁,轻量级锁发生抢占时会升级为重锁,然后阻塞队列可以通过自旋优化来尽可能减少阻塞

wait/notify

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入EntryList 重新竞争

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
  • obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

1
2
3
4
5
6
7
8
9
10
11
synchronized(lock) {
while(条件不成立) {
lock.wait();
}
// 干活
}

//另一个线程
synchronized(lock) {
lock.notifyAll();
}

保护性暂停

(同步)模式之保护性暂停

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

  • 一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式,因为要等待另一方的结果,因此归类到同步模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class GuardedObject {

private Object response;
private final Object lock = new Object();

public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}

public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}

}
拓展
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class GuardedObject {//增加了等待时间
// 标识 Guarded Object
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
// 结果
private Object response;
// 获取结果
// timeout 表示要等待多久 2000
public Object get(long timeout) {
synchronized (this) {
// 开始时间 15:00:00
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
while (response == null) {
// 未等待时间 需等待时间 已等待时间
long waitTime = timeout - passedTime;
// 经历的时间超过了最大等待时间时,退出循环
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虚假唤醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求得经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Mailboxes {
//中间解耦类,聚合了上方的中间类,表示信箱
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();

private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}

public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}

public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}

public static Set<Integer> getIds() {
return boxes.keySet();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Postman extends Thread {
//模拟快递员
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
1
2
3
4
5
6
7
8
9
10
11
class People extends Thread{
//模拟收件人
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws InterruptedException {
//测试
for (int i = 0; i < 3; i++) {
new People().start();
}
Sleeper.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}

1
2
3
4
5
6
7
8
9
10
运行结果
10:35:05.689 c.People [Thread-1] - 开始收信 id:3
10:35:05.689 c.People [Thread-2] - 开始收信 id:1
10:35:05.689 c.People [Thread-0] - 开始收信 id:2
10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2
10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1
10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3
  • 产生结果的线程和使用结果的线程一一对应

生产者/消费者模式

(异步)模式之生产者/消费者

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Message {
//定义消息
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}

class MessageQueue {
//定义消息队列
private LinkedList<Message> queue;//共享变量
private int capacity;

public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}

public Message take() {
//消费
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("没货了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
queue.notifyAll();//消费后通知生产者
return message;
}
}

public void put(Message message) {
//生产
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("库存已达上限, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();//生产后通知消费者
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
MessageQueue messageQueue = new MessageQueue(2);

// 4 个生产者线程, 下载任务
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
try {
log.debug("download...");
List<String> response = Downloader.download();
log.debug("try put message({})", id);
messageQueue.put(new Message(id, response));
} catch (IOException e) {
e.printStackTrace();
}
}, "生产者" + i).start();
}

// 1 个消费者线程, 处理结果
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.debug("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消费者").start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
结果
10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait

park与unpark

  • 暂停当前线程

    • LockSupport.park();
  • 恢复某个线程的运行

    • LockSupport.unpark(暂停线程对象)

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

死锁:在两个或多个并发进程中,如果每个进程持有某种资源而又都等待着别的进程释放它或它们现在保持着的资源,否则就不能向前推进,此时每个进程都占用了一定的资源但又都不能向前推进,称这一组进程产生了死锁

活锁:两个线程互相改变对方的结束条件,最后谁也无法结束

饥饿:当线程需要某些资源(通常是CPU资源) , 但却始终获得不到.造成饥饿的情况

ReentrantLock

相对于 synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁 (避免饥饿)
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

基本语法

1
2
3
4
5
6
7
8
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁

如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

可打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
log.debug("启动...");

try {
//没有竞争就会获取锁
//有竞争就进入阻塞队列等待,但可以被打断
lock.lockInterruptibly();
//lock.lock(); //不可打断
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}

try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();

try {
sleep(1);
log.debug("执行打断");
t1.interrupt();
} finally {
lock.unlock();
}
锁超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
log.debug("启动...");

try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();

try {
sleep(2);
} finally {
lock.unlock();
}
条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
static ReentrantLock lock = new ReentrantLock();

static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();

static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;

public static void main(String[] args) {

new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();

sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}

private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}

private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}

共享模型之内存

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受 cpu 缓存的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

可见性

先来看个问题:退不出的死循环?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static boolean run = true;

public static void main(String[] args) throws InterruptedException {

Thread t = new Thread(()->{
while(run){
// ....
}
});
t.start();

sleep(1);
run = false; // 线程t不会如预想的停下来
}
  • 为啥呢???
  1. 初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。

  1. 因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率

  1. 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值

解决办法

volatile(易变关键字)

它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存

可见性:它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见,但不能保证原子性,仅用在一个写线程,多个读线程的情况(也就是保证读取的是最新的,但不能阻止指令重排)

注意
synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是
synchronized 是属于重量级操作,性能相对更低

如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到对 run 变量的修改了,想一想为什么?
因为其内部包含了synchronized 的使用

犹豫模式

Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MonitorService {

// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting;

public void start() {
log.info("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}

// 真正启动监控线程...
}
}

有序性

JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int i;
static int j;

// 在某个线程内执行如下赋值操作
i = ...;
j = ...;


//可以看到,至于是先执行 i 还是 先执行 j ,对最终的结果不会产生影响。所以,上面代码真正执行时,既可以是
i = ...;
j = ...;
------------------------------------------
j = ...;
i = ...;

这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性。

volatile原理

  • 如何保证可见性?

    • volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
      • 对 volatile 变量的写指令后会加入写屏障 ,写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
      • 对 volatile 变量的读指令前会加入读屏障,而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
  • 如何保证有序性?

    • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
    • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

还是那句话,不能解决指令交错

  • 写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证读跑到它前面去
  • 而有序性的保证也只是保证了本线程内相关代码不被重排序

共享模型之无锁

CAS与volatile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AccountSafe implements Account {

private AtomicInteger balance; //原子整数

public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}

@Override
public Integer getBalance() {
return balance.get();
}

@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next)) {//CAS
break;
}
}
}
}
  • 前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
    • 其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

vK8JMV.md.png

CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。

在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被

线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。

  • 它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。

CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

原子整数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
AtomicInteger i = new AtomicInteger(0);

// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());

// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());

// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());

// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());

// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));

// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));

// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));

// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));

// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

原子引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> ref;

public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}

@Override
public BigDecimal getBalance() {
return ref.get();
}

@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}

}

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况

如果主线程希望: 只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

可使用AtomicStampedReference 实现

AtomicStampedReference ref = new AtomicStampedReference<>(“A”, 0);

不关心引用变量更改了几次,只是单纯的关心是否更改过,可使用 AtomicMarkableReference

AtomicMarkableReference ref = new AtomicMarkableReference<>(bag, true);

原子数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
参数1,提供数组、可以是线程不安全数组或线程安全数组
参数2,获取数组长度的方法
参数3,自增方法,回传 array, index
参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer ) {

List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {
// 每个线程对数组作 10000 次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j%length);
}
}));
}
ts.forEach(t -> t.start()); // 启动所有线程

ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}); // 等所有线程结束
printConsumer.accept(array);
}

原子数组实现:

1
2
3
4
5
6
demo(
()-> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);

原子字段更新器

  • 利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,
    否则会出现异常 Exception in thread “main” java.lang.IllegalArgumentException: Must be volatile type
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Test5 {
private volatile int field;

public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");

Test5 test5 = new Test5();
fieldUpdater.compareAndSet(test5, 0, 10);
// 修改成功 field = 10
System.out.println(test5.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(test5, 10, 20);
System.out.println(test5.field);
// 修改失败 field = 20
fieldUpdater.compareAndSet(test5, 10, 30);
System.out.println(test5.field);
}
}

共享模型之不可变

如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改

不可变设计

1
2
3
4
5
6
7
8
9
public final class String implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0

// ...

}

发现该类、类中所有属性都是 final 的

  • 属性用 final 修饰保证了该属性是只读的,不能修改
  • 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性

构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。
这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】

final

发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况

无状态

在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的

因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为【无状态】


并发工具

线程池

  • 一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作

线程池作用:

  1. 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  2. 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
  3. 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
  • 线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
  • 池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
原理

  1. 创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
  2. 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
    • 如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建非核心线程立刻运行这个任务,对于阻塞队列中的任务不公平。这是因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
  3. 当一个线程完成任务时,会从队列中取下一个任务来执行
  4. 当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小
创建方式

ThreadPoolExecutor

构造方法:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数介绍:

  • corePoolSize:核心线程数,定义了最小可以同时运行的线程数量

  • maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数

  • keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等到 keepAliveTime 时间超过销毁

  • unit:keepAliveTime 参数的时间单位

  • workQueue:阻塞队列,存放被提交但尚未被执行的任务

  • threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字

  • handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略

    RejectedExecutionHandler 下有 4 个实现类:

    • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
    • CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
    • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
    • DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务

    补充:其他框架拒绝策略

    • Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
    • Netty:创建一个新线程来执行任务
    • ActiveMQ:带超时等待(60s)尝试放入队列
    • PinPoint:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
实现类

Executors 提供了四种线程池的创建:

newFixedThreadPool

newCachedThreadPool

newSingleThreadExecutor

newScheduledThreadPool


  • newFixedThreadPool

    • ```java
      public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue());
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12

      - 核心线程数 == 最大线程数(**没有救急线程被创建**),因此也无需超时时间
      - LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 `Integer.MAX_VALUE`,也就是**无界队列**,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出)
      - 适用于任务量已知,相对耗时的长期任务

      - `newCachedThreadPool`

      - ```java
      public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>());
      }
      • 核心线程数是 0, 最大线程数是 29 个 1,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
      • SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
      • 适合任务数比较密集,但每个任务执行时间较短的情况
  • newSingleThreadExecutor

    • ```java
      public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue()));
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93

      - 保证所有任务按照**指定顺序执行**,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放

      对比:

      - **创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作**

      - Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法

      原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法

      - Executors.newFixedThreadPool(1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

      ##### 提交方法

      ExecutorService 类 API:

      | 方法 | 说明 |
      | ------------------------------------------------------------ | ------------------------------------------------------------ |
      | void execute(Runnable command) | 执行任务(Executor 类 API) |
      | Future<?> submit(Runnable task) | 提交任务 task() |
      | Future submit(Callable task) | 提交任务 task,用返回值 Future 获得任务执行结果 |
      | List<Future> invokeAll(Collection<? extends Callable> tasks) | 提交 tasks 中所有任务 |
      | List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) | 提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常 |
      | T invokeAny(Collection<? extends Callable> tasks) | 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 |

      execute 和 submit 都属于线程池的方法,对比:

      - execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是**封装成 FutureTask,然后调用 execute 执行**
      - execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出

      ##### 关闭方法

      ExecutorService 类 API:

      | 方法 | 说明 |
      | ----------------------------------------------------- | ------------------------------------------------------------ |
      | void shutdown() | 线程池状态变为 SHUTDOWN,**等待任务执行完后关闭线程池**,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定ren'wu) |
      | List shutdownNow() | 线程池状态变为 STOP,用 interrupt **中断正在执行的任务**,直接关闭线程池,不会接收新任务,会将队列中的任务返回 |
      | boolean isShutdown() | 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true |
      | boolean isTerminated() | 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true |
      | boolean awaitTermination(long timeout, TimeUnit unit) | 调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 |

      ##### 工作线程模式

      **让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务**。也可以将其归类为分工模式,它的典型实现
      就是线程池,也体现了经典设计模式中的享元模式。

      注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

      **案例:**

      ```java
      public class TestStarvation {

      static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
      static Random RANDOM = new Random();

      static String cooking() {
      return MENU.get(RANDOM.nextInt(MENU.size()));
      }

      public static void main(String[] args) {
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      executorService.execute(() -> {
      log.debug("处理点餐...");
      Future<String> f = executorService.submit(() -> {
      log.debug("做菜");
      return cooking();
      });
      try {
      log.debug("上菜: {}", f.get());
      } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      }
      });
      /*
      executorService.execute(() -> {
      log.debug("处理点餐...");
      Future<String> f = executorService.submit(() -> {
      log.debug("做菜");
      return cooking();
      });
      try {
      log.debug("上菜: {}", f.get());
      } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
      }
      });
      */
      }
      }

固定大小线程池会有饥饿现象

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜:没啥说的,做就是了
  • 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
  • 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿

解决办法:不同的任务类型,采用不同的线程池

创建多少线程池合适 ?

  • CPU 密集型运算

  • 通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因
    导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

  • I/O 密集型运算

  • CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程
    RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

  • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

异常处理

execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法

方法 1:主动捉异常

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
System.out.println("task1");
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});

方法 2:使用 Future 对象

1
2
3
4
5
6
7
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = pool.submit(() -> {
System.out.println("task1");
int i = 1 / 0;
return true;
});
System.out.println(future.get());
Fork/Join
  • Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
  • Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
  • Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> {
int n;

public AddTask1(int n) {
this.n = n;
}

@Override
public String toString() {
return "{" + n + '}';
}

@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}

// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork() {} + {}", n, t1);

// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}

JUC

AQS(AbstractQueuedSynchronizer) 原理

  • 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
    • 队列同步器是用来构建锁的基础框架,如 ReentrantLock 和 ReentrantReadWriteLock 中的同步器 Sync 就是对队列同步器的具体实现。同步器 Sync 帮助锁隐藏了实现细节以及简化了锁的实现,开发者只需要关心锁的具体使用方法就行。
  • AQS 用状态属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

读写锁

ReentrantReadWriteLock

  • 当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode
  • 读-读 可并发
  • 读-写 / 写-写 互斥
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class DataContainer {

private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}

public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}

}
StampedLock
  • 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用加解读锁

  • 乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次戳校验

    • 如果校验通过,表示这期间确实没有写操作,数据可以安全使用。
    • 如果校验没通过,需要重新获取读锁,保证数据安全。

Semaphore

信号量,用来限制能同时访问共享资源的线程上限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 3. 获取许可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

new Thread(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

new Thread(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

new Thread(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

log.debug("waiting...");
latch.await();
log.debug("wait end...");
}

CyclicBarrier

  • 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』
  • 每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.
  • 可重复使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行

new Thread(()->{
System.out.println("线程1开始.."+new Date());
try {
cb.await(); // 当个数不足时,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..."+new Date());
}).start();

new Thread(()->{
System.out.println("线程2开始.."+new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
try {
cb.await(); // 2 秒后,线程个数够2,继续运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..."+new Date());
}).start();

注意 CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的

CyclicBarrier 可以被比喻为『人满发车』

线程安全集合类