Java并发一文流

宋正兵 on 2021-06-27

1.什么是线程和进程?

进程就是程序执行一次的过程,使系统运行程序的基本单位。系统运行一个程序就是一个进程从创建、运行到消亡的过程。

线程是一个比进程更小的执行单位。一个进程在其运行过程中可以产生多个线程。多个线程可以共享进程的方法区资源,但是每个线程都有自己的程序计数器虚拟机栈本地方法栈

Java 程序中,main 线程执行 main 方法。一个 Java 程序的运行是 main 线程和多个其他线程同时运行

2.线程与进程的关系

一个进程中可以有多个线程,多个线程共享进程的方法区资源,但是每个线程都得有自己的程序计数器虚拟机栈本地方法栈

线程是进程划分成的更小的运行单位。线程和进程最大的不同在于基本上各进程是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。

扩展:为什么程序计数器、虚拟机栈和本地方法栈是线程私有的呢?为什么堆和方法区是线程共享的呢?

程序计数器的作用是告诉当前线程下一行需要执行的代码在哪里,即告诉字节码解释器下一条指令的位置,应该保证每个线程所执行的内容相互独立,所以理所当然的是私有的。

如果执行的是 native 方法,那么程序计数器记录的是 undefined 地址,只有执行 Java 代码时程序计数器记录的才是下一条指令的地址。

虚拟机栈它是描述了 Java 方法执行的模型,即线程每调用一次方法就会在虚拟机栈中创建一个栈帧,每个栈帧内包含局部变量表和操作数栈、常量池引用等信息。从方法调用直至执行完成的过程,就对应着一个栈帧在 Java 虚拟机栈中入栈和出栈的过程。本地方法栈和 Java 虚拟机栈作用相同,只不过本地方法栈是为 native 方法服务的。为了保证线程中的局部变量不被别的线程访问,它们必须是私有的

方法区是所有线程共享的资源,其中堆是进程中最大的一块内存,用于存放新创建的对象,方法区主要用于保存已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。

3.并发和并行的区别

并发:同一时间段内,多个任务都在执行。

并行:单位时间内,多个任务同时执行。

4.为什么要使用多线程?

为了充分利用CPU资源,采用多线程的方式去同时完成几件事情而不互相干扰。在处理大量的IO操作时或处理的情况需要花费大量的时间时利用多线程也可以提高程序运行效率。并且 现在的系统可能会出现并发量很高的情况,多线程编程时开发高并发系统的基础,利用好线程机制可以大大提高系统整体的并发能力及性能。

5.使用多线程可能带来的问题?

并发编程的目的是为了提高程序的执行效率,但并不总是能提高程序运行速度,可能会出现的问题比如:内存泄漏、死锁、线程不安全等等。

6.线程的生命周期和状态

Java 线程的生命周期包括:初始状态(New)、运行状态(RUNNABLE)、阻塞状态(BLOCKED)、等待状态(WAITING)、超时等待状态(TIME_WAITING)、终止状态(TERMINATED)。

线程创建之后它将处于 初始状态(New),调用 start() 方法后开始运行,转换到 运行状态(RUNNABLE)(在操作系统中还会细分为可运行状态和运行状态,前者表示在等候被 CPU 调度运行,后者表示已经获取了时间片在运行中),当线程执行 wait() 方法之后,线程会进入 等待状态(WAITING)。进入等待状态的线程需要依靠其它线程的通知才能够返回到运行状态,而调用了 sleep(long millis) 方法或者 wait(long millis) 方法的线程会进入 超时等待状态(TIME_WAITING),当超时时间到达后线程将会返回运行状态(RUNNABLE)。当线程调用同步方法的时候,如果没有获取到锁,那么会进入到 阻塞状态(BLOCKED)。线程在执行完所有代码后将会进入到终止状态(TERMINATED)

7.什么是上下文切换?

当前线程在执行完 CPU 时间片切换到另一个线程之前会先保存自己的状态,以便下次再切换回这个线程时,可以再加载这个线程的状态。一个线程被暂停剥夺使用权,另外一个线程被选中开始或者继续运行的过程就是一次上下文切换

8.死锁

死锁的定义:多个线程同时被阻塞,它们都在等待某个资源被释放。由于线程被一直阻塞住,导致程序不能正常终止。

产生死锁的四个必要条件

  1. 互斥条件:该资源任意一个时刻只由一个线程占用。
  2. 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
  4. 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

如何避免死锁?

通过破坏死锁产生的必要条件:

  1. 破坏请求与保持条件 :一次性申请所有的资源。
  2. 破坏不剥夺条件 :占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。
  3. 破坏循环等待条件 :靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。破坏循环等待条件。

如何避免死锁?

避免死锁就是在资源分配时,借助于算法(比如银行家算法)对资源分配进行计算评估,使其进入安全状态。

安全状态指的是系统能够按照某种进行推进顺序(P1、P2、P3…..Pn)来为每个进程分配所需资源,直到满足每个进程对资源的最大需求,使每个进程都可顺利完成。

银行家算法就是计算系统目前的资源数量,通过先试探分配给进程资源,若能满足该进程执行的资源需求则表示安全,将该进程标记为可完成,然后回收资源数量并去找当前资源可以满足的下一个进程。若所有进程都可执行完毕,则系统处于安全状态,并根据可完成进程的分配顺序生成安全执行序列。

9.sleep()方法和wait()方法区别和共同点?

sleep() 方法和 wait() 方法都可以暂停线程的执行。但是 sleep() 方法会让线程从 RUNNABLE 状态转换到 TIME_WAITING 状态,并且不会释放锁,当超过等待时间后,线程将会被自动唤醒;wait() 方法会让线程从 RUNNABLE 状态转换到 WAITING 状态,会释放锁,但是需要别的线程调用同一个锁对象上的 notify() 方法或者 notifyAll() 方法才会被唤醒。

10. 调用start()方法时会执行run()方法,为什么不直接调用run()方法?

new 一个 Thread,线程进入了新建状态。调用 start() 方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。start() 会执行线程的相应准备工作,然后自动执行 run() 方法的内容,run() 方法的内容是真正的多线程工作。 但是,直接执行 run() 方法,会把 run() 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。

总结: 调用 start() 方法方可启动线程并使线程进入就绪状态,直接执行 run() 方法的话不会以多线程的方式执行。

11.synchronized关键字

synchronized 关键字可以解决多个线程之间访问资源的同步性问题,保证它修饰的方法或者代码块在任意时刻只能有一个线程执行。

synchronized关键字的使用方法

  • 修饰方法:对当前对象实例加锁
  • 修饰静态方法:对当前类加锁,即当前 class 的锁
  • 修饰代码块:对指定对象加锁

双重校验锁实现单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
class Singleton {
private volatile static Singleton INSTANCE;
public static Singleton getInstance() {
if (INSTANCE == null) {
synchronized (this) {
if (INSTANCE == null) {
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}

构造方法可以使用synchronized关键字修饰吗?

不能,构造方法本身属于线程安全的,不存在同步的构造方法一说。

synchronized关键字的底层原理

字节码层面。

synchronized 修饰代码块时使用的是 monitorentermonitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。

synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。

monitorenter — 将 lock 对象的 MarkWord 置为 Monitor 指针;
monitorexit — 将 lock 对象的 MarkWord 重置,唤醒 EntryList。

JDK1.6之后对synchronized关键字底层做了哪些优化?

JDK 1.6 对锁的实现引入了大量的优化,如偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除等技术来减少锁操作的开销。

Java6及以上版本对synchronized的优化 - 蜗牛大师 - 博客园 (cnblogs.com)

级别从低到高分别是:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

浅谈偏向锁、轻量级锁、重量级锁 - 简书 (jianshu.com)

偏向锁

适用于只有一个线程访问同步块的场景,它的目的是减少无竞争且只有一个线程使用锁的情况下,轻量级锁每次加锁、释放锁都需要 CAS 操作的性能消耗。

偏向锁加锁 :第一次加锁时会用 CAS 操作将自己的线程 ID 设置到锁对象的 MarkWord,之后每次加锁都会检查锁对象的 MarkWord 中有没有保存自己的线程 ID,是自己的线程 ID 则表示没有竞争,不用重新 CAS。

偏向锁撤销 :当另一个线程用 CAS 操作尝试去替换锁对象的 MarkWord 中保存的线程 ID 时,如果失败则会将偏向锁撤销,并且原持有偏向锁的线程如果还未退出同步代码块,则表明发生了锁竞争,会将锁膨胀为轻量级锁。

轻量级锁

当出现有两个线程来竞争锁的话,那么偏向锁就失效了,此时锁就会膨胀升级为轻量级锁。

轻量级锁加锁 :在当前线程的栈帧中创建一个锁记录 Lock Record,并且用 CAS 操作替换锁记录指针以及锁对象的 MarkWord,如果成功则表示获得锁,失败则表示其它线程竞争,当前线程尝试自旋来获取锁。

轻量级锁解锁 :用 CAS 操作替换当前线程的锁记录指针和锁对象的 MarkWord,如果成功则正常解锁,如果失败就会锁膨胀升级为重量级锁解锁流程。

自旋 :如果线程自旋获取锁超过了一定次数还没有获得锁,那么就会锁膨胀,尝试加重量级锁。

可重入: 发生重入则在栈帧中再创建一条锁记录用于重入计数,解锁时每次重置一条锁记录。

重量级锁

重量级锁加锁 :将 Monitor 的 Owner 设置为当前线程,其他线程再来发现已经 Owner 不为空则会被加入到 EntryList BLOCKED 阻塞队列中。

重量级锁解锁 :将 Owner 设置为 null,并且唤醒阻塞队列中的线程来竞争锁。

如果持有锁的线程调用 wait() 方法,则会被加入 WaitSet 中同时唤醒阻塞队列中的线程来竞争锁,等待被其他线程调用 notify() 方法或者 notifyAll() 方法唤醒,唤醒后被加入阻塞队列去竞争锁。

优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程使用自旋会消耗CPU 追求响应时间,锁占用时间很短
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢 追求吞吐量,锁占用时间较长

12.synchronized和ReentrantLock的区别

相同点

都是可重入锁,同一个线程每次获取锁,锁的计数器都自增 1,需要等到锁的计数器为 0 时才能真正的释放锁。

不同点

  • synchronized 依赖于 JVM 实现;ReentrantLock 是 JDK 层面实现的,需要 lock()unlock() 方法配合 try/finally 语句块来完成。
  • ReentrantLock 更加灵活:
    • 支持响应中断:让正在等待的线程放弃等待,改为处理其它事情
    • 可实现公平锁:让先等待的线程先获得锁(默认是非公平锁)
    • 条件变量实现选择性通知:借助 Condition 接口来实现,且一个锁可以实现多个条件(synchronized 只能实现一个默认的条件队列)

13. volatile关键字

并发编程的三个重要特性

原子性 :一个操作或者多次操作,要么所有的操作全部成功,要么所有的操作都不执行。

synchronized 关键字可以保证代码片段的原子性。

可见性 :一个线程对共享变量进行了修改,其他的线程立即能够看到该变量的这种修改。

volatile 关键字可以让改变量在被读写时强制刷新到主内存中。synchronized 关键字和 Lock 接口也可以实现可见性,加锁时会从主存中刷新共享变量的值,解锁时会将工作内存中的值同步到主内存当中。

有序性 :单线程中代码的执行是从前往后依次执行的,但是在多线程并发时,程序的执行可能是无序的。无序有可能是因为发生了指令重排序、工作内存和主内存同步延迟现象。

volatile 关键字可以禁止指令重排序,但是想要保证有序性,需要再加上 synchronized 关键字通过加锁来允许同一时间只有一个线程对共享变量进行操作。

synchronized关键字和volatile关键字的区别

它们是互补的存在。

  • volatile 关键字只能用于变量,synchronized 关键字可以修饰方法及代码块。
  • volatile 关键字可以保证数据的可见性,但不能保证原子性,synchronized 关键字两者都可以保证
  • volatile 关键字主要用于解决变量在多个线程之间的可见性,synchronized 关键字解决的是多个线程之间访问资源的同步问题。

14.ThreadLocal

如果创建了一个 ThreadLocal 变量,那么访问这个变量的每个线程都会有这个变量的本地副本。调用 get()set() 方法时操作的是自己本地内存中的变量,从而规避了线程安全问题。

Thread 类中有 threadLocals 变量,它是 ThreadLocalMap 类型的变量,可以把 ThreadLocalMap 理解成 ThreadLocal 类实现的定制化 HashMap,key 为当前线程的 this 引用,value 为本地变量值。默认情况下两个变量都是 null,只有当前线程调用 ThreadLocal 类的 set()get() 方法时才创建它们,实际上调用这两个方法的时候,调用的是 ThreadLocalMap 类对应的 set()get() 方法。

每个线程的本地变量不是存放在 ThreadLocal 实例中,而是放在调用线程的 ThreadLocals 变量里面。如果调用线程一直不终止,那么这个本地变量将会一直存放在它的 threadLocals 中,所以不使用本地变量的时候需要调用 remove() 方法将其删除,防止可能出现的内存溢出情况。

ThreadLocal的内存泄漏问题

在 ThreadLocalMap 中,使用的 key 为弱引用,而 value 是强引用。在发生 GC 的时候,key 会被清理掉,而 value 不会被清理掉。这样一来就会出现 key 为 null 的 Entry。假如不做任何措施,value 将永远无法被 GC 回收,就有可能会产生内存泄漏。所以使用完 ThreadLocal 方法后最好手动调用 remove() 方法。

15.线程池

为什么要用线程池?

线程池就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁而是放回池中,从而达到降低资源消耗、提高响应速度和提高线程的可管理型。

实现Runnable接口和Callable接口的区别

Runnable 接口不会返回结果或抛出检查异常,Callable 接口可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

执行 execute()方法和 submit()方法的区别是什么呢?

execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功;

submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个对象的 get() 方法可以获取返回值,get() 方法会阻塞当前线程直到任务完成,可以传入参数设置等待时间。

如何创建线程池

1 通过构造方法实现

2 通过 Executor 框架的工具类 Executors 来实现

  • FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor:方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

推荐通过 ThreadPoolExecutor 的方式创建线程池,原因如下:

Executors 创建线程时的默认实现固定了队列长度和线程数量。

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

ThreadPoolExecutor类分析

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

ThreadPoolExecutor 类共提供了四个构造方法。其余三个都是在这个构造方法上提供了默认参数,只需要看这个最长的就好了。

参数分析

最重要的 3 个参数:

  • corePoolSize:核心线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize:当队列中存放的任务达到队列最大容量的时候,可以同时运行的线程数量变为最大线程数
  • workQueue:当新任务来的时候会先判断当前运行的线程数量是否达到了核心线程数,如果达到的话新任务就会被放入任务队列中。

其他参数:

  • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这个时候没有新的任务提交,核心线程之外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁。
  • unit:keepAliveTime 参数的时间单位。
  • threadFactory:executor 创建新线程的时候会用到。
  • handler:拒绝策略。

ThreadPoolExecutor拒绝策略

拒绝策略的定义:如果当前同时运行的线程数量达到最大线程数量,并且队列也已经被满了,ThreadPoolExecutor 定义了一些拒绝策略:

  • ThreadPoolExecutor.AbortPolicy:拒绝执行,抛出 RejectedExecutionException 来拒绝新任务的处理。【默认】
  • ThreadPoolExecutor.CallerRunsPolicy:让调用者自己执行任务。
  • ThreadPoolExecutor.DiscardPolicy:直接丢弃。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早的未处理的任务请求。

手撕线程池

  1. 自定义任务队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    // 拒绝策略
    @FunctionalInterface
    interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
    }

    @Slf4j
    class BlockingQueue<T> {
    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量,添加线程,满的时候等待
    private Condition fullWaitSet = lock.newCondition();
    // 消费者条件变量,执行线程,空的时候等待
    private Condition emptyWaitSet = lock.newCondition();
    // 容量
    private int capcity;

    public BlockingQueue(int capacity) {
    this.capacity = capacity;
    }
    // 阻塞获取
    public T task() {
    lock.lock();
    try {
    // while 是为了防止被虚假唤醒,即唤醒过后资源已经被其他线程取走了。
    while (queue.isEmpty()) {
    try {
    emptyWaitSet.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    T t = queue.removeFirst();
    // 生产者干活啦!
    fullWaitSet.signal();
    return t;
    } finally {
    lock.unlock();
    }
    }
    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit unit) {
    lock.lock();
    try {
    long nanos = unit.toNanos(timeout);
    while (queue.isEmpty()) {
    try {
    if (nanos <= 0) {
    return null;
    }
    nanos = emptyWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    T t = queue.removeFirst();
    // 生产者干活啦!
    fullWaitSet.signal();
    return t;
    } finally {
    lock.unlock();
    }
    }
    // 阻塞添加
    public void put(T task) {
    lock.lock();
    try {
    while (queue.size() == capacity) {
    try {
    fullWaitSet.await();
    log.debug("等待任务加入队列{}", task);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    log.debug("加入任务队列{}", task);
    queue.addLast(task);
    // 消费者来开饭啦!
    emptyWaitSet.signal();
    } finally {
    lock.unlock();
    }
    }
    // 带超时的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    lock.lock();
    try {
    long nanos = timeUnit.toNanos(timeout);
    while (queue.size() == capcity) {
    try {
    if (nanos <= 0) {
    return false;
    }
    log.debug("等待加入队列{}", task);
    nanos = fullWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    log.debug("加入任务队列{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    return true;
    } finally {
    lock.unlock();
    }
    }
    // 带拒绝策略的添加
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    lock.lock();
    try {
    if (queue.size() == capcity) {
    rejectPolicy.reject(this, task);
    } else {
    log.debug("加入任务队列{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    }
    } finally {
    lock.unlock();
    }
    }

    // 返回队列长度
    public int size() {
    lock.lock();
    try {
    return queue.size();
    } finally {
    lock.unlock();
    }
    }
    }
  2. 自定义线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    @Slf4j
    class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 核心线程数
    private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
    private TimeUnit timeUnit;
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy, int queueCapcity) {
    this.coreSize = coreSize;
    this.timeout = timeout;
    this.timeUnit = timeUnit;
    this.rejectPolicy = rejectPolicy;
    this.taskQueue = new BlockingQueue<>(queueCapcity);
    }
    // 执行任务
    public void execute(Runnable task) {
    synchronized (workers) {
    if (workers.size() < coreSize) {
    Worker worker = new Worker(task);
    log.debug("新增worker{}", worker);
    workers.add(worker);
    worker.start();
    } else {
    taskQueue.tryPut(rejectPolicy, task);
    }
    }
    }

    class Worker extends Thread {
    private Runnable task;
    public Worker(Runnable task) {
    this.task = task;
    }
    @Override
    public void run() {
    // 当task不为空,执行任务;当task执行完毕,从任务队列中获取任务并执行
    while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
    try {
    log.debug("正在执行{}", task);
    task.run();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    task = null;
    }
    }
    synchronized (workers) {
    log.debug("worker被移除{}", this);
    workers.remove(this);
    }
    }
    }
    }
  3. 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS,
    // 行为化参数,指定拒绝策略
    (queue, task) -> {
    // 1. 死等
    // queue.put(task);
    // 2) 带超时等待
    queue.offer(task, 1500, TimeUnit.MILLISECONDS);
    // 3) 让调用者放弃任务执行
    // log.debug("放弃{}", task);
    // 4) 让调用者抛出异常
    // throw new RuntimeException("任务执行失败 " + task);
    // 5) 让调用者自己执行任务
    // task.run();
    }, 1);
    for (int i = 0; i < 4; i++) {
    int j = i;
    threadPool.execute(() -> {
    try {
    Thread.sleep(1000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.debug("{}", j);
    });
    }

线程池原理

线程池的工作工程

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    2. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    3. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    4. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

16.Atomic原子类

Atomic 类在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。

JUC 包中的原子类包括哪4类

  • 基本类型:使用原子的方式更新基本类型
    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean:布尔型原子类
  • 数组类型:使用原子的方式更新数组里的某个元素
    • AtomicIntegerArray:整型数组原子类
    • AtomicLongArray:长整型原子类
    • AtomicReferenceArray:引用类型数组原子类
  • 引用类型
    • AtomicReference:引用类型原子类
    • AtomicStampedReference:原子更新带有版本号的引用类型。可以解决 CAS 中的 ABA 问题。
    • AtomicMarkableReference:原子更新带有标记位的引用类型。
  • 对象的属性修改类型
    • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器
    • AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器

AtomicInteger的使用

常用方法:

1
2
3
4
5
6
7
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

也有 addAndGet()decrementAndGet()incrementAndGet() 等方法。

使用 AtomicInteger 之后,不用对 increment() 方法加锁也可以保证线程安全。

AtomicInteger类的原理

1
2
3
4
5
6
7
8
9
10
11
12
13
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
// 本地方法,这个方法是用来拿到“原来的值”的内存地址
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

AtomicInteger 类主要利用 CAS + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率比较高。

getAndIncrement() 方法举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
// AtomicInteger.getAndIncrement()
public final int getAndIncrement() {
return U.getAndAddInt(this, valueOffset, 1);
}
// Unsafe.getAndAddInt()
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
// CAS
do {
v = this.getIntVolatile(o, offset);
} while (!this.compareAndSwapInt(o, offset, v, v + delta));
return v;
}

17.AQS

AQS 抽象的队列式的同步器,是阻塞式锁和相关的同步器工具的框架,许多同步类实现都依赖于它,如常用的 Lock、Semaphore、ReentrantLock等。

  • volatile int state 属性来表示资源的状态(分独占模式和共享模式),独占模式(Exclusive)是只有一个线程能够访问资源(如 ReentrantLock),而共享模式(Share)可以允许多个线程访问资源(如 Semaphore / CountDownLatch)。

    state 的访问方式有三种:

    1. getState()——获取 state 状态
    2. setState()——设置 state 状态
    3. compareAndSetState()——CAS 机制设置 state 状态
  • 提供了一个 FIFO 的线程等待队列,类似于 Monitor 的 EntryList

  • 条件变量来实现等待唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

AQS的自定义同步器实现

自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。

自定义同步器实现时主要实现以下几种方法:

  • tryAcquire(int):独占方式。尝试获取资源,成功返回 true,失败则返回 false。
  • tryRelease(int):独占方式。尝试释放资源,成功返回 true,失败则返回 false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回 true,否则返回 false。
  • isHeldExclusively():该线程是否正独占资源。只有用到 condition 才需要去实现它。

AQS的原理

其核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求资源被占用,则把它放到一个双端队列中,等候被唤醒。

拿 ReentranLock 的工作流程举例:

加锁

如果当前没有线程持有锁并且没有竞争,那么线程 t0 调用 lock() 方法加锁,先尝试用 CAS 操作将锁的状态 state 由 0 改为 1,成功后将 owner 设置为当前线程。

此时来了第二个线程 t1,调用 lock() 方法加锁,先调用 tryAcquire() 方法尝试用 CAS 操作将锁的状态 state 由 0 改为 1,失败,进入 acquire() 方法进行下一步处理:

  • 再次调用 tryAcquire() 方法将锁的状态 state 由 0 改为 1,结果失败
  • 调用 addWaiter() 方法,将当前线程 t1 以 Node 的形式加入队列当中,进入 acquireQueued() 方法尝试为队列中排队的线程获取锁
  • acquireQueued() 方法会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
    • 如果当前线程 t1 所在的节点紧邻着 head (即排在第二位),那么再次调用 tryAcquire() 方法尝试获取锁,结果失败
    • 进入 shouldParkAfterFailedAcquire() 方法,将前驱 Node,此时将 head 的 waitStatus 改为 -1,表示它有责任唤醒它的后继节点,第一次调用返回 false,再次进入循环调用 tryAcquire() 方法,结果失败
    • 此时再次进入 shouldParkAfterFailedAcquire() 方法,因为前驱 Node 的 waitStatus 已经是 -1,这次返回 true
    • 进入 parkAndCheckInterupt() 方法,将当前线程 t1 park

acquireQueued() 方法的逻辑:一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

解锁

t0 线程释放锁,进入 tryRelease() 方法,如果成功设置 exclusiveOwnerThread 为 null,state 设置为 0。

如果当前的队列不为 null,并且 head 的 waitStatus 为 -1,进入 unparkSuccessor() 方法,找到队列中离 head 最近的一个没有被取消的 Node,unpark 恢复其运行。

AQS对资源的共享方式

  • Exclusive(独占):只有一个线程能执行,如 ReentrantLock。又分为公平锁和非公平锁
    • 公平锁:按照线程在队列中的排队顺序,先到的先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去竞争锁