相关概念 进程:程序是静止的,进程实体的运行过程就是进程,是系统进行资源分配的基本单位
进程的特征:并发性、异步性、动态性、独立性、结构性
线程 :线程是属于进程的,是一个基本的 CPU 执行单元,是程序执行流的最小单元。线程是进程中的一个实体,是系统独立调度的基本单位 ,线程本身不拥有系统资源,只拥有一点在运行中必不可少的资源,与同属一个进程的其他线程共享进程所拥有的全部资源
关系:一个进程可以包含多个线程,这就是多线程,比如看视频是进程,图画、声音、广告等就是多个线程
线程的作用:使多道程序更好的并发执行,提高资源利用率和系统吞吐量,增强操作系统的并发性能
并发并行:
并行:在同一时刻,有多个指令在多个 CPU 上同时执行
并发:在同一时刻,有多个指令在单个 CPU 上交替执行
同步异步:
需要等待结果返回,才能继续运行就是同步
不需要等待结果返回,就能继续运行就是异步
线程进程对比:
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
进程拥有共享的资源,如内存空间等,供其内部的线程共享
进程间通信较为复杂
同一台计算机的进程通信称为 IPC(Inter-process communication)
信号量:信号量是一个计数器,用于多进程对共享数据的访问,解决同步相关的问题并避免竞争条件
共享存储:多个进程可以访问同一块内存空间,需要使用信号量用来同步对共享存储的访问
管道通信:管道是用于连接一个读进程和一个写进程以实现它们之间通信的一个共享文件 pipe 文件,该文件同一时间只允许一个进程访问,所以只支持
半双工通信
匿名管道(Pipes):用于具有亲缘关系的父子进程间或者兄弟进程之间的通信
命名管道(Names Pipes):以磁盘文件的方式存在,可以实现本机任意两个进程通信,遵循 FIFO
消息队列:内核中存储消息的链表,由消息队列标识符标识,能在不同进程之间提供
全双工通信
,对比管道:
匿名管道存在于内存中的文件;命名管道存在于实际的磁盘介质或者文件系统;消息队列存放在内核中,只有在内核重启(操作系统重启)或者显示地删除一个消息队列时,该消息队列才被真正删除
读进程可以根据消息类型有选择地接收消息,而不像 FIFO 那样只能默认地接收
不同计算机之间的进程通信 ,需要通过网络,并遵守共同的协议,例如 HTTP
套接字:与其它通信机制不同的是,可用于不同机器间的互相通信
线程通信相对简单,因为线程之间共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
Java 中的通信机制 :volatile、等待/通知机制、join 方式、InheritableThreadLocal、MappedByteBuffer
线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
Java线程 实现多线程的方式 1.继承Thread类,重写该类的run()方法 Thread 构造器:
public Thread()
public Thread(String name)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class ThreadDemo { public static void main (String[] args) { Thread t = new MyThread (); t.start(); for (int i = 0 ; i < 100 ; i++ ){ System.out.println("main线程" + i) } } }class MyThread extends Thread { @Override public void run () { for (int i = 0 ; i < 100 ; i++ ) { System.out.println("子线程输出:" +i) } } }
2.实现Runnable接口,并重写该接口的run()方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class ThreadDemo { public static void main (String[] args) { Runnable target = new MyRunnable (); Thread t1 = new Thread (target,"1号线程" ); t1.start(); Thread t2 = new Thread (target); } }public class MyRunnable implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++ ){ System.out.println(Thread.currentThread().getName() + "->" + i); } } }
3.实现 Callable 接口:接口创建线程。
定义一个线程任务类实现 Callable 接口,申明线程执行的结果类型
重写线程任务类的 call 方法,这个方法可以直接返回执行的结果
创建一个 Callable 的线程任务对象
把 Callable 的线程任务对象包装成一个未来任务对象
把未来任务对象包装成线程对象
调用线程的 start() 方法启动线程
public FutureTask(Callable<V> callable)
:未来任务对象,在线程执行完后得到线程的执行结果
FutureTask 就是 Runnable 对象,因为 Thread 类只能执行 Runnable 实例的任务对象 ,所以把 Callable 包装成未来任务对象
线程池部分详解了 FutureTask 的源码
public V get()
:同步等待 task 执行完毕的结果,如果在线程中获取另一个线程执行结果,会阻塞等待,用于线程同步
get() 线程会阻塞等待任务执行完成
run() 执行完后会把结果设置到 FutureTask 的一个成员变量,get() 线程可以获取到该变量的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyThread implements Callable <String> { private int count = 20 ; @Override public String call () throws Exception { for (int i = count; i > 0 ; i--) { System.out.println(Thread.currentThread().getName()+"当前票数:" + i); } return "sale out" ; } public static void main (String[] args) throws InterruptedException, ExecutionException { Callable<String> callable =new MyThread (); FutureTask <String>futureTask=new FutureTask <>(callable); Thread mThread=new Thread (futureTask); Thread mThread2=new Thread (futureTask); Thread mThread3=new Thread (futureTask); mThread.start(); mThread2.start(); mThread3.start(); System.out.println(futureTask.get()); } }
4.通过线程池启动多线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class Test { public static void main (String[] args) { ExecutorService ex=Executors.newFixedThreadPool(5 ); for (int i=0 ;i<5 ;i++) { ex.submit(new Runnable () { @Override public void run () { for (int j=0 ;j<10 ;j++) { System.out.println(Thread.currentThread().getName()+j); } } }); } ex.shutdown(); } }
常用方法 start和run
直接调用 run 是在主线程中执行了 run,没有启动新的线程
使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码
sleep 与 yield
sleep:
调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
睡眠结束后的线程未必会立刻得到执行
建议用 TimeUnit(时间单位) 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
yield
调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程
具体的实现依赖于操作系统的任务调度器
线程优先级
线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它
如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用
在没有利用 cpu 来计算时,不要让 while(true) 空转浪费 cpu,这时可以使用 yield 或 sleep 来让出 cpu 的使用权 给其他程序
1 2 3 4 5 6 7 while (true ) { try { Thread.sleep(50 ); } catch (InterruptedException e) { e.printStackTrace(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static int r = 0 ;public static void main (String[] args) throws InterruptedException { test1(); }private static void test1 () throws InterruptedException { log.debug("开始" ); Thread t1 = new Thread (() -> { log.debug("开始" ); sleep(1 ); log.debug("结束" ); r = 10 ; }); t1.start(); t1.join(); log.debug("结果为:{}" , r); log.debug("结束" ); }
需要等待结果返回,才能继续运行就是同步
不需要等待结果返回,就能继续运行就是异步
在一个线程 T1 中如何“优雅”终止线程 T2?这里的【优雅】指的是给 T2 一个料理后事的机会。
1 2 3 4 5 6 7 8 9 graph TD W("while(true)") --> a a("有没有被打断?") --是--> b(料理后事) b --> c((结束循环)) a--否 --> d(睡眠2s) d --无异常--> e(执行监控记录) d --有异常--> i(设置打断标记) i-->W e-->W
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class TPTInterrupt { private Thread thread; public void start () { thread = new Thread (() -> { while (true ) { Thread current = Thread.currentThread(); if (current.isInterrupted()) { log.debug("料理后事" ); break ; } try { Thread.sleep(1000 ); log.debug("将结果保存" ); } catch (InterruptedException e) { current.interrupt(); } } },"监控线程" ); thread.start(); } public void stop () { thread.interrupt(); } }
1 2 3 4 5 6 TPTInterrupt t = new TPTInterrupt (); t.start(); Thread.sleep(3500 ); log.debug("stop" ); t.stop();
守护线程
默认情况下,Java 进程需要等待所有线程都运行结束,才会结束。
有一种特殊的线程叫做守护线程,只要其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
1 2 3 4 5 6 7 8 9 10 11 12 log.debug("开始运行..." );Thread t1 = new Thread (() -> { log.debug("开始运行..." ); sleep(2 ); log.debug("运行结束..." ); }, "daemon" ); t1.setDaemon(true ); t1.start(); sleep(1 ); log.debug("运行结束..." );
五种状态
【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联
【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
【运行状态】指获取了 CPU 时间片运行中的状态
当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
【阻塞状态】
如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】
等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态
synchronized synchronized 实际是用对象锁 保证了临界区内代码的原子性 ,临界区内的代码对外是不可分割的,不会被线程切换所打断。
加在对象上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class Room { int value = 0 ; public void increment () { synchronized (this ) { value++; } } public void decrement () { synchronized (this ) { value--; } } public int get () { synchronized (this ) { return value; } } }@Slf4j public class Test1 { public static void main (String[] args) throws InterruptedException { Room room = new Room (); Thread t1 = new Thread (() -> { for (int j = 0 ; j < 5000 ; j++) { room.increment(); } }, "t1" ); Thread t2 = new Thread (() -> { for (int j = 0 ; j < 5000 ; j++) { room.decrement(); } }, "t2" ); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("count: {}" , room.get()); } }
加在方法上:
1 2 3 4 5 6 7 8 9 10 11 12 13 class Test { public synchronized void test () { } } 等价于class Test { public void test () { synchronized (this ) { } } }
加在静态方法上:
1 2 3 4 5 6 7 8 9 10 11 12 13 class Test { public synchronized static void test () { } } 等价于class Test { public static void test () { synchronized (Test.class) { } } }
共享模型之管程 管程可以看做⼀个软件模块,它是将共享的变量和对于这些共享变量的操作封装起来,形成⼀个具有⼀定接口的功能模块,进程可以调⽤管程来实现进程级别的并发控制。
变量的线程安全分析 成员变量和静态变量是否线程安全?
局部变量是否线程安全?
局部变量是线程安全的
但局部变量引用的对象则未必
如果该对象没有逃离方法的作用访问,它是线程安全的
如果该对象逃离方法的作用范围,需要考虑线程安全
常见线程安全类
String
Integer
StringBuffffer
Random
Vector
Hashtable
java.util.concurrent 包下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例 的某个方法时,是线程安全的。
但注意它们多个方法的组合不是原子的
1 2 3 4 5 6 7 8 sequenceDiagram participant t1 as 线程1 participant t2 as 线程2 participant table t1 ->> table : get("key")==nu11 t2 ->> table : get("key")==nu1l t2 ->> table : put("key", v2) t1 ->> table : put("key", v1)
String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的
原理之 Monitor(锁)
刚开始 Monitor 中 Owner 为 null
当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner
在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj),就会进入EntryList(阻塞队列) BLOCKED
Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争的时是非公平的
图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析
synchronized 必须是进入同一个对象的 monitor 才有上述的效果
不加 synchronized 的对象不会关联监视器,不遵从以上规则
Monitor是操作系统C++实现的
轻量级锁 使用场景:如果一个对象虽然有多线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。
创建 锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word
让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录
如果 cas 替换成功,对象头中存储了 锁记录地址和状态 00 ,表示由该线程给对象加锁,这时图示如下
重量级锁(膨胀) 如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
即为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址 然后自己进入 Monitor 的 EntryList BLOCKED
自旋优化 重量级锁竞争的时候,还可以使用自旋(循环尝试获取重量级锁)来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。 (进入阻塞再恢复,会发生上下文切换,比较耗费性能)
偏向锁 Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS 。以后只要不发生竞争,这个对象就归该线程所有
锁消除 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Fork(1) @BenchmarkMode(Mode.AverageTime) @Warmup(iterations=3) @Measurement(iterations=5) @OutputTimeUnit(TimeUnit.NANOSECONDS) public class MyBenchmark { static int x = 0 ; @Benchmark public void a () throws Exception { x++; } @Benchmark public void b () throws Exception { Object o = new Object (); synchronized (o) { x++; } } }
总结:synchronized默认是使用轻量级锁,轻量级锁发生抢占时会升级为重锁,然后阻塞队列可以通过自旋优化来尽可能减少阻塞
wait/notify
Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
BLOCKED 线程会在 Owner 线程释放锁时唤醒
WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入EntryList 重新竞争
obj.wait() 让进入 object 监视器的线程到 waitSet 等待
obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法
1 2 3 4 5 6 7 8 9 10 11 synchronized (lock) { while (条件不成立) { lock.wait(); } }synchronized (lock) { lock.notifyAll(); }
保护性暂停 (同步)模式之保护性暂停
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
有一个结果 需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式,因为要等待另一方的结果,因此归类到同步模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class GuardedObject { private Object response; private final Object lock = new Object (); public Object get () { synchronized (lock) { while (response == null ) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete (Object response) { synchronized (lock) { this .response = response; lock.notifyAll(); } } }
拓展 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class GuardedObject { private int id; public GuardedObject (int id) { this .id = id; } public int getId () { return id; } private Object response; public Object get (long timeout) { synchronized (this ) { long begin = System.currentTimeMillis(); long passedTime = 0 ; while (response == null ) { long waitTime = timeout - passedTime; if (timeout - passedTime <= 0 ) { break ; } try { this .wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } } public void complete (Object response) { synchronized (this ) { this .response = response; this .notifyAll(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class Mailboxes { private static Map<Integer, GuardedObject> boxes = new Hashtable <>(); private static int id = 1 ; private static synchronized int generateId () { return id++; } public static GuardedObject getGuardedObject (int id) { return boxes.remove(id); } public static GuardedObject createGuardedObject () { GuardedObject go = new GuardedObject (generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds () { return boxes.keySet(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class Postman extends Thread { private int id; private String mail; public Postman (int id, String mail) { this .id = id; this .mail = mail; } @Override public void run () { GuardedObject guardedObject = Mailboxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}" , id, mail); guardedObject.complete(mail); } }
1 2 3 4 5 6 7 8 9 10 11 class People extends Thread { @Override public void run () { GuardedObject guardedObject = Mailboxes.createGuardedObject(); log.debug("开始收信 id:{}" , guardedObject.getId()); Object mail = guardedObject.get(5000 ); log.debug("收到信 id:{}, 内容:{}" , guardedObject.getId(), mail); } }
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 3 ; i++) { new People ().start(); } Sleeper.sleep(1 ); for (Integer id : Mailboxes.getIds()) { new Postman (id, "内容" + id).start(); } }
1 2 3 4 5 6 7 8 9 10 运行结果10 :35 :05.689 c.People [Thread-1 ] - 开始收信 id:3 10 :35 :05.689 c.People [Thread-2 ] - 开始收信 id:1 10 :35 :05.689 c.People [Thread-0 ] - 开始收信 id:2 10 :35 :06.688 c.Postman [Thread-4 ] - 送信 id:2 , 内容:内容2 10 :35 :06.688 c.Postman [Thread-5 ] - 送信 id:1 , 内容:内容1 10 :35 :06.688 c.People [Thread-0 ] - 收到信 id:2 , 内容:内容2 10 :35 :06.688 c.People [Thread-2 ] - 收到信 id:1 , 内容:内容1 10 :35 :06.688 c.Postman [Thread-3 ] - 送信 id:3 , 内容:内容3 10 :35 :06.689 c.People [Thread-1 ] - 收到信 id:3 , 内容:内容3
生产者/消费者模式 (异步)模式之生产者/消费者
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK 中各种阻塞队列,采用的就是这种模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class Message { private int id; private Object message; public Message (int id, Object message) { this .id = id; this .message = message; } public int getId () { return id; } public Object getMessage () { return message; } }class MessageQueue { private LinkedList<Message> queue; private int capacity; public MessageQueue (int capacity) { this .capacity = capacity; queue = new LinkedList <>(); } public Message take () { synchronized (queue) { while (queue.isEmpty()) { log.debug("没货了, wait" ); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); queue.notifyAll(); return message; } } public void put (Message message) { synchronized (queue) { while (queue.size() == capacity) { log.debug("库存已达上限, wait" ); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); queue.notifyAll(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 MessageQueue messageQueue = new MessageQueue (2 );for (int i = 0 ; i < 4 ; i++) { int id = i; new Thread (() -> { try { log.debug("download..." ); List<String> response = Downloader.download(); log.debug("try put message({})" , id); messageQueue.put(new Message (id, response)); } catch (IOException e) { e.printStackTrace(); } }, "生产者" + i).start(); }new Thread (() -> { while (true ) { Message message = messageQueue.take(); List<String> response = (List<String>) message.getMessage(); log.debug("take message({}): [{}] lines" , message.getId(), response.size()); } }, "消费者" ).start();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 结果10 :48 :38.070 [生产者3 ] c.TestProducerConsumer - download...10 :48 :38.070 [生产者0 ] c.TestProducerConsumer - download...10 :48 :38.070 [消费者] c.MessageQueue - 没货了, wait10 :48 :38.070 [生产者1 ] c.TestProducerConsumer - download...10 :48 :38.070 [生产者2 ] c.TestProducerConsumer - download...10 :48 :41.236 [生产者1 ] c.TestProducerConsumer - try put message (1 ) 10 :48 :41.237 [生产者2 ] c.TestProducerConsumer - try put message (2 ) 10 :48 :41.236 [生产者0 ] c.TestProducerConsumer - try put message (0 ) 10 :48 :41.237 [生产者3 ] c.TestProducerConsumer - try put message (3 ) 10 :48 :41.239 [生产者2 ] c.MessageQueue - 库存已达上限, wait10 :48 :41.240 [生产者1 ] c.MessageQueue - 库存已达上限, wait10 :48 :41.240 [消费者] c.TestProducerConsumer - take message (0 ) : [3 ] lines10 :48 :41.240 [生产者2 ] c.MessageQueue - 库存已达上限, wait10 :48 :41.240 [消费者] c.TestProducerConsumer - take message (3 ) : [3 ] lines10 :48 :41.240 [消费者] c.TestProducerConsumer - take message (1 ) : [3 ] lines10 :48 :41.240 [消费者] c.TestProducerConsumer - take message (2 ) : [3 ] lines10 :48 :41.240 [消费者] c.MessageQueue - 没货了, wait
park与unpark
暂停当前线程
恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
与 Object 的 wait & notify 相比
wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
park & unpark 可以先 unpark,而 wait & notify 不能先 notify
死锁: 在两个或多个并发进程中,如果每个进程持有某种资源而又都等待着别的进程释放它或它们现在保持着的资源,否则就不能向前推进,此时每个进程都占用了一定的资源但又都不能向前推进,称这一组进程产生了死锁
活锁: 两个线程互相改变对方的结束条件,最后谁也无法结束
饥饿: 当线程需要某些资源(通常是CPU资源) , 但却始终获得不到.造成饥饿的情况
ReentrantLock 相对于 synchronized 它具备如下特点
可中断
可以设置超时时间
可以设置为公平锁 (避免饥饿)
支持多个条件变量
与 synchronized 一样,都支持可重入
基本语法
1 2 3 4 5 6 7 8 reentrantLock.lock();try { } finally { reentrantLock.unlock(); }
可重入 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
可打断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 ReentrantLock lock = new ReentrantLock ();Thread t1 = new Thread (() -> { log.debug("启动..." ); try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("等锁的过程中被打断" ); return ; } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start();try { sleep(1 ); log.debug("执行打断" ); t1.interrupt(); } finally { lock.unlock(); }
锁超时 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 ReentrantLock lock = new ReentrantLock ();Thread t1 = new Thread (() -> { log.debug("启动..." ); try { if (!lock.tryLock(1 , TimeUnit.SECONDS)) { log.debug("获取等待 1s 后失败,返回" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start();try { sleep(2 ); } finally { lock.unlock(); }
条件变量 synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待 ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 static ReentrantLock lock = new ReentrantLock ();static Condition waitCigaretteQueue = lock.newCondition();static Condition waitbreakfastQueue = lock.newCondition();static volatile boolean hasCigrette = false ;static volatile boolean hasBreakfast = false ;public static void main (String[] args) { new Thread (() -> { try { lock.lock(); while (!hasCigrette) { try { waitCigaretteQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的烟" ); } finally { lock.unlock(); } }).start(); new Thread (() -> { try { lock.lock(); while (!hasBreakfast) { try { waitbreakfastQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的早餐" ); } finally { lock.unlock(); } }).start(); sleep(1 ); sendBreakfast(); sleep(1 ); sendCigarette(); }private static void sendCigarette () { lock.lock(); try { log.debug("送烟来了" ); hasCigrette = true ; waitCigaretteQueue.signal(); } finally { lock.unlock(); } }private static void sendBreakfast () { lock.lock(); try { log.debug("送早餐来了" ); hasBreakfast = true ; waitbreakfastQueue.signal(); } finally { lock.unlock(); } }
共享模型之内存
原子性 - 保证指令不会受到线程上下文切换的影响
可见性 - 保证指令不会受 cpu 缓存的影响
有序性 - 保证指令不会受 cpu 指令并行优化的影响
可见性 先来看个问题:退不出的死循环?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static boolean run = true ;public static void main (String[] args) throws InterruptedException { Thread t = new Thread (()->{ while (run){ } }); t.start(); sleep(1 ); run = false ; }
初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。
因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率
1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值
解决办法 :
volatile(易变关键字)
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存
可见性: 它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见,但不能保证原子性,仅用在一个写线程,多个读线程的情况(也就是保证读取的是最新的,但不能阻止指令重排 )
注意 synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低
如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到对 run 变量的修改了,想一想为什么? 因为其内部包含了synchronized 的使用
犹豫模式 Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class MonitorService { private volatile boolean starting; public void start () { log.info("尝试启动监控线程..." ); synchronized (this ) { if (starting) { return ; } starting = true ; } } }
有序性 JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static int i;static int j; i = ...; j = ...; i = ...; j = ...; ------------------------------------------ j = ...; i = ...;
这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性。
volatile原理
如何保证可见性?
volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
对 volatile 变量的写指令后会加入写屏障 ,写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
对 volatile 变量的读指令前会加入读屏障,而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
如何保证有序性?
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
还是那句话,不能解决指令交错
写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证读跑到它前面去
而有序性的保证也只是保证了本线程内相关代码不被重排序
共享模型之无锁 CAS与volatile 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class AccountSafe implements Account { private AtomicInteger balance; public AccountSafe (Integer balance) { this .balance = new AtomicInteger (balance); } @Override public Integer getBalance () { return balance.get(); } @Override public void withdraw (Integer amount) { while (true ) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break ; } } } }
前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被
线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想 :最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
原子整数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 AtomicInteger i = new AtomicInteger (0 ); System.out.println(i.getAndIncrement()); System.out.println(i.incrementAndGet()); System.out.println(i.decrementAndGet()); System.out.println(i.getAndDecrement()); System.out.println(i.getAndAdd(5 )); System.out.println(i.addAndGet(-5 )); System.out.println(i.getAndUpdate(p -> p - 2 )); System.out.println(i.updateAndGet(p -> p + 2 )); System.out.println(i.getAndAccumulate(10 , (p, x) -> p + x)); System.out.println(i.accumulateAndGet(-10 , (p, x) -> p + x));
原子引用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class DecimalAccountSafeCas implements DecimalAccount { AtomicReference<BigDecimal> ref; public DecimalAccountSafeCas (BigDecimal balance) { ref = new AtomicReference <>(balance); } @Override public BigDecimal getBalance () { return ref.get(); } @Override public void withdraw (BigDecimal amount) { while (true ) { BigDecimal prev = ref.get(); BigDecimal next = prev.subtract(amount); if (ref.compareAndSet(prev, next)) { break ; } } } }
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况
如果主线程希望: 只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
可使用AtomicStampedReference 实现
AtomicStampedReference ref = new AtomicStampedReference<>(“A”, 0);
不关心引用变量更改了几次,只是单纯的关心是否更改过,可使用 AtomicMarkableReference
AtomicMarkableReference ref = new AtomicMarkableReference<>(bag, true);
原子数组 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private static <T> void demo ( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer ) { List<Thread> ts = new ArrayList <>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0 ; i < length; i++) { ts.add(new Thread (() -> { for (int j = 0 ; j < 10000 ; j++) { putConsumer.accept(array, j%length); } })); } ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); }
原子数组实现:
1 2 3 4 5 6 demo( ()-> new AtomicIntegerArray (10 ), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array) );
原子字段更新器
利用字段更新器,可以针对对象的某个域(Field)进行原子操作 ,只能配合 volatile 修饰的字段使用, 否则会出现异常 Exception in thread “main” java.lang.IllegalArgumentException: Must be volatile type
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Test5 { private volatile int field; public static void main (String[] args) { AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field" ); Test5 test5 = new Test5 (); fieldUpdater.compareAndSet(test5, 0 , 10 ); System.out.println(test5.field); fieldUpdater.compareAndSet(test5, 10 , 20 ); System.out.println(test5.field); fieldUpdater.compareAndSet(test5, 10 , 30 ); System.out.println(test5.field); } }
共享模型之不可变 如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改
不可变设计 1 2 3 4 5 6 7 8 9 public final class String implements java .io.Serializable, Comparable<String>, CharSequence { private final char value[]; private int hash; }
发现该类、类中所有属性都是 final 的
属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。 这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】
final 发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况
无状态 在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的
因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为【无状态】
并发工具 线程池
一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作
线程池作用:
降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的核心思想:线程复用 ,同一个线程可以被重复使用,来处理多个任务
池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
原理
创建线程池,这时没有创建线程(懒惰 ),等待提交过来的任务请求,调用 execute 方法才会创建线程
当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务
如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize
,那么会创建非核心线程立刻运行这个任务 ,对于阻塞队列中的任务不公平。这是因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行
如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略 来执行
当一个线程完成任务时,会从队列中取下一个任务来执行
当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小
创建方式 ThreadPoolExecutor :
构造方法:
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数介绍:
corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数
keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于 corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等到 keepAliveTime
时间超过销毁
unit:keepAliveTime
参数的时间单位
workQueue:阻塞队列,存放被提交但尚未被执行的任务
threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略
RejectedExecutionHandler 下有 4 个实现类:
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务
补充:其他框架拒绝策略
Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
Netty:创建一个新线程来执行任务
ActiveMQ:带超时等待(60s)尝试放入队列
PinPoint:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
实现类 Executors 提供了四种线程池的创建:
newFixedThreadPool
newCachedThreadPool
newSingleThreadExecutor
newScheduledThreadPool
newFixedThreadPool
```java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }1 2 3 4 5 6 7 8 9 10 11 12 - 核心线程数 == 最大线程数(**没有救急线程被创建**),因此也无需超时时间 - LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 `Integer.MAX_VALUE`,也就是**无界队列**,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出) - 适用于任务量已知,相对耗时的长期任务 - `new CachedThreadPool ` - ```java public static ExecutorService new CachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60 L, TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
核心线程数是 0, 最大线程数是 29 个 1,全部都是救急线程 (60s 后可以回收),可能会创建大量线程,从而导致 OOM
SynchronousQueue 作为阻塞队列,没有容量 ,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
适合任务数比较密集,但每个任务执行时间较短的情况
newSingleThreadExecutor
```java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 - 保证所有任务按照* * 指定顺序执行* * ,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放 对比: - * * 创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作* * - Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法 原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法 - Executors.newFixedThreadPool(1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改 ExecutorService 类 API: | 方法 | 说明 | | ------------------------------------------------------------ | ------------------------------------------------------------ | | void execute(Runnable command) | 执行任务(Executor 类 API) | | Future<?> submit(Runnable task) | 提交任务 task() | | Future submit(Callable task) | 提交任务 task,用返回值 Future 获得任务执行结果 | | List<Future> invokeAll(Collection<? extends Callable> tasks) | 提交 tasks 中所有任务 | | List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) | 提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常 | | T invokeAny(Collection<? extends Callable> tasks) | 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 | execute 和 submit 都属于线程池的方法,对比: - execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是* * 封装成 FutureTask,然后调用 execute 执行* * - execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出 ExecutorService 类 API: | 方法 | 说明 | | ----------------------------------------------------- | ------------------------------------------------------------ | | void shutdown() | 线程池状态变为 SHUTDOWN,**等待任务执行完后关闭线程池**,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定ren'wu) | | List shutdownNow() | 线程池状态变为 STOP,用 interrupt **中断正在执行的任务**,直接关闭线程池,不会接收新任务,会将队列中的任务返回 | | boolean isShutdown() | 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true | | boolean isTerminated() | 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true | | boolean awaitTermination(long timeout, TimeUnit unit) | 调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 |* * 让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务* * 。也可以将其归类为分工模式,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。 注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率 * * 案例:* * ```java public class TestStarvation { static final List<String> MENU = Arrays.asList("地三鲜" , "宫保鸡丁" , "辣子鸡丁" , "烤鸡翅" ); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(() -> { log.debug("处理点餐..." ); Future<String> f = executorService.submit(() -> { log.debug("做菜" ); return cooking(); }); try { log.debug("上菜: {}" , f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); /* executorService.execute(() -> { log.debug("处理点餐..."); Future<String> f = executorService.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); */ } }
固定大小线程池会有饥饿现象
两个工人是同一个线程池中的两个线程
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
后厨做菜:没啥说的,做就是了
比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
解决办法:不同的任务类型,采用不同的线程池
创建多少线程池合适 ?
CPU 密集型运算
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因 导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
异常处理 execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法
方法 1:主动捉异常
1 2 3 4 5 6 7 8 9 ExecutorService executorService = Executors.newFixedThreadPool(1 ); pool.submit(() -> { try { System.out.println("task1" ); int i = 1 / 0 ; } catch (Exception e) { e.printStackTrace(); } });
方法 2:使用 Future 对象
1 2 3 4 5 6 7 ExecutorService executorService = Executors.newFixedThreadPool(1 ); Future<?> future = pool.submit(() -> { System.out.println("task1" ); int i = 1 / 0 ; return true ; }); System.out.println(future.get());
Fork/Join
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j(topic = "c.AddTask") class AddTask1 extends RecursiveTask <Integer> { int n; public AddTask1 (int n) { this .n = n; } @Override public String toString () { return "{" + n + '}' ; } @Override protected Integer compute () { if (n == 1 ) { log.debug("join() {}" , n); return n; } AddTask1 t1 = new AddTask1 (n - 1 ); t1.fork(); log.debug("fork() {} + {}" , n, t1); int result = n + t1.join(); log.debug("join() {} + {} = {}" , n, t1, result); return result; } }
JUC AQS(AbstractQueuedSynchronizer)
原理
全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
队列同步器是用来构建锁的基础框架 ,如 ReentrantLock 和 ReentrantReadWriteLock 中的同步器 Sync 就是对队列同步器的具体实现。同步器 Sync 帮助锁隐藏了实现细节以及简化了锁的实现 ,开发者只需要关心锁的具体使用方法就行。
AQS 用状态属性来表示资源的状态(分独占模式和共享模式 ),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
读写锁 ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode
读-读 可并发
读-写 / 写-写 互斥
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock (); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read () { log.debug("获取读锁..." ); r.lock(); try { log.debug("读取" ); sleep(1 ); return data; } finally { log.debug("释放读锁..." ); r.unlock(); } } public void write () { log.debug("获取写锁..." ); w.lock(); try { log.debug("写入" ); sleep(1 ); } finally { log.debug("释放写锁..." ); w.unlock(); } } }
StampedLock
Semaphore 信号量,用来限制能同时访问共享资源的线程上限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..." ); sleep(1 ); log.debug("end..." ); } finally { semaphore.release(); } }).start(); } }
CountdownLatch 用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); new Thread (() -> { log.debug("begin..." ); sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); new Thread (() -> { log.debug("begin..." ); sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); new Thread (() -> { log.debug("begin..." ); sleep(1.5 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); log.debug("waiting..." ); latch.await(); log.debug("wait end..." ); }
CyclicBarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』
每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.
可重复使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 CyclicBarrier cb = new CyclicBarrier (2 ); new Thread (()->{ System.out.println("线程1开始.." +new Date ()); try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..." +new Date ()); }).start();new Thread (()->{ System.out.println("线程2开始.." +new Date ()); try { Thread.sleep(2000 ); } catch (InterruptedException e) { } try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..." +new Date ()); }).start();
注意 CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的
CyclicBarrier 可以被比喻为『人满发车』
线程安全集合类