JUC·ThreadPoolExecutor 线程池

原文:https://juejin.cn/post/6844903560899985415

ThreadPoolExecutor算是JUC中最常用的类之一了。ThreadPoolExecutor,顾名思义,thread-pool-executor,硬翻译就是“线程-池-执行者”;java中,通过ThreadPoolExecutor可以很容易的创建一个线程池。但是我们为什么要使用线程池?呢?它能够带来什么样的优势呢?它又是怎么实现的呢?OK,带着这几个问题,我们来学习一下JAVA中的线程池技术。

为什么要使用线程池?

关于这个问题其实有点鸡肋,我觉得再问这个问题之前更应该问为什么要有线程池。那为什么呢?


this is a 例子:

快递行业最近两年发展的灰常火热,听说工资也非常的高,搞得我一天天的都没有心思去好好写代码了...

之前的小快递公司都是没有固定的快递员的,就是说,每次去送一件快递,站点负责人就需要去找一个人来帮忙送,送完之后就没有然后了(当然,钱还是要给的)。

但是后来随着货越来越多,找人给钱成本太大,而且农忙时还需要花很长时间去找人,所以就雇用了5个人,签了合同,长期为站点配送。

以前都是随时用随时找,现在不是,现在是成立了一个物流公司,开了一个配送部,配送部门规定正式配送员最多只能有五个人。

之前配送的缺点是什么:

  • 每次有货,我都会去临时找一个人,然后签订临时合同,送完之后解除合同。很麻烦。
    这也是不用线程池的缺点,就是任务来了,我们需要频繁的去创建新的线程,用完之后还需要释放线程资源,对于系统的消耗是很大的。
  • 因为配送的货车只有那么几个,如果临时签订的人多了,车子不够用,其他人只能等着车子送完之后才能用。

成立配送部之后解决的问题

  • 成立配送部之后呢,因为签订的是劳务合同,我们可以重复的让配送员配送不同的货物。达到线程资源的复用。
  • 因为限定了最多招聘的人数,可以很好的避免招过多无用的人。

OK,我们以上述例子来对应理解线程池的基本原理

先来看下,JAVA对ThreadPoolExecutor的类申明:

1
public class ThreadPoolExecutor extends AbstractExecutorService 

【初识】-JUC·Executor框架中给出了Executor的继承体系。ThreadPoolExecutor就是具备线程池功能的集成者。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//构造方法一
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);

}
//构造方法二
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//构造方法三
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//构造方法四
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

从上面的代码可以看出,构造方法(一、二、三)都是通过调用(四)来做具体属性初始化的。那么我们直接来看构造方法四;在构造方法四中总共需要7个参数,先来看下每个参数的具体含义:

  • corePoolSize

    核心线程数大小。那么什么是核心线程数呢,我们可以类比于上面例子中的配送部中签订劳动合同的人的个数。

  • maximumPoolSize

    最大线程数。加入说现在是双十一期间,快递异常的多,配送部的5个人完全忙不过来,而且仓库也满了,怎么办呢?这个时候就需要再招聘一些临时配送员,假设maximumPoolSize为10,那么也就是说,临时招聘可以招5个人,配送部签订正式劳动合同的人和签订临时合同的人加一块不能超过配送部规定的最大人数(10人)。所以说,maximumPoolSize就是线程池能够允许的存在的最大线程的数量。

  • keepAliveTime

    存活时间。为什么要有这个呢?想一下,双十一过去了,货物已经配送的差不多了。临时合同写的是如果临时配送员2天没有配送了,那配送部就有权利终止临时合同,现在已经达到2天这个点了,需要开除这些临时配送专员了。对于线程池来说,keepAliveTime就是用来表示,当除核心线程池之外的线程超过keepAliveTime时间之后,就需要被系统回收了。

  • unit

    keepAliveTime的时间单位。

  • workQueue

    工作队列。这个就相当于一个仓库,现在配送部5个人都在配送,但是还不断的有新的快递达到,这个时候就需要一个仓库来存放这些快递。对于线程池来说,当核心线程都有自己的任务处理,并且还有任务进来的时候,就会将任务添加到工作队列中去。

  • threadFactory

    线程工厂。就是用来创建线程的。可以类比成招聘组,会给每个线程分配名字或者编号这样。

  • handler

    RejectedExecutionHandler 用来描述拒绝策略的。假设现在我的仓库也满足,并且配送部已经达到10个人了。怎么办呢,那么只能采用一些策略来拒绝任务了。

线程池的状态

1
2
3
4
5
6
7
8
9
10
11
// runState is stored in the high-order bits
//RUNNING;该状态的线程池接收新任务,并且处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN;该状态的线程池不接收新任务,但会处理阻塞队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS;
//STOP;不接收新任务,也不处理阻塞队列中的任务,并且会中断正在运行的任务;
private static final int STOP = 1 << COUNT_BITS;
//所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态
private static final int TIDYING = 2 << COUNT_BITS;
//线程池彻底终止,就变成TERMINATED状态。
private static final int TERMINATED = 3 << COUNT_BITS;

下面是在网上发现的一位大牛的图;感觉可以较为直观的描述状态的变更

工作原理

线程池执行原理

有几个点需要注意。

1、如何提交一个任务到线程池?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void execute(Runnable command) {
//任务为null,直接抛出空指针异常
if (command == null)
throw new NullPointerException();

int c = ctl.get();
//如果线程数大于等于基本线程数,将任务加入队列
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
  • 如果少于corePoolSize线程正在运行,请尝试使用给定命令启动一个新线程作为其第一个任务。 对addWorker的调用会自动检查runState和workerCount,从而防止错误报警,在不应该的时候通过返回false来添加线程。
  • 如果一个任务能够成功排队,那么我们仍然需要再次检查是否应该添加一个线程(因为现有的线程自上次检查以来已经死掉)或者自从进入这个方法以来,池关闭了。所以我们重新检查状态,如果当前command已经stop了,那么就退出工作队列,如果没有的话就开始一个新的线程。
  • 如果队列满了,会想尝试去创建一个新的线程去执行,如果创建不了,那就执行拒绝策略。

2、如何创建一个线程去处理任务?

通过实现这个接口去创建一个新的线程

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

3、如何将任务添加到队列?

通过addWorker方法来添加,其实在excute中只是作为一个提交任务的入口,实际的处理逻辑都是在addWorker这个方法里来完成的。addWorker有两个参数:

  • firstTask 当前任务
  • core 用来标注当前需要创建的线程是否是核心线程,如果core为true,则表明创建的是核心线程,也就是说当前还没有达到最大核心线程数。

先来看下这个方法的前半部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//自旋方式
for (;;) {
//获取当前线程池的状态
int c = ctl.get();
int rs = runStateOf(c);

//如果状态是STOP,TIDYING,TERMINATED状态的话,则会返回false
//如果状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//通过自旋的方式,判断要添加的worker是否为corePool范畴之内的
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//如果超过CAPACITY限制了则直接返回false

1
wc >= CAPACITY

//判断当前的workerCount是否大于corePoolsize,否则则判断是否大于maximumPoolSize
//具体的比较取决于入参core是true还是false。

1
wc >= (core ? corePoolSize : maximumPoolSize)

如果上面两个有一个满足了,则直接返回false。

下面是判断WorkerCount通过CAS操作增加1是否成功,成功的话就到此结束

1
2
if (compareAndIncrementWorkerCount(c))
break retry;

如果不成功,则再次判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断。

1
2
3
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;

再来看下这个方法的后面半个部分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
//创建一个新的Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
//
if (t != null) {
//加锁
mainLock.lock();
try {
// 在锁定的情况下重新检查。
// 在一下情况退出:ThreadFactory 创建失败或者在获取锁之前shut down了
int c = ctl.get();
int rs = runStateOf(c);
//状态校验
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 预先检查t是可以启动的
throw new IllegalThreadStateException();
//添加至workers中
workers.add(w);
int s = workers.size();
//如果超过了历史最大线程数,则将当前池数量设置为历史最大线程记录数
if (s > largestPoolSize)
largestPoolSize = s;
//标识添加工作线程成功
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//如果添加成功则启动当前工作线程
if (workerAdded) {
t.start();
//并将当前线程状态设置为已启动
workerStarted = true;
}
}
} finally {
//添加失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

拒绝策略有哪些?

  • 1、AbortPolicy:直接抛出异常,默认策略;
  • 2、CallerRunsPolicy:使用调用者自己的当前线程来执行任务;
  • 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • 4、DiscardPolicy:直接丢弃任务;

当然我们也可以自定义拒绝策略。

常用工作队列类型

1、ArrayBlockingQueue

基于数组的阻塞队列,长度有限

2、LinkedBlockingQuene

基于链表的阻塞队列,长度无限,使用这个可能会导致我们的拒绝策略失效。因为可以无限的创建新的工作线程。

3、PriorityBlockingQueue

具有优先级的无界阻塞队列;

3、SynchronousQuene

SynchronousQuene是一个是一个不存储元素的BlockingQueue;每一个put操作必须要等待一个take操作,否则不能继续添加元素。所以这个比较特殊,它不存我们的任务,也就说说它的每个put操作必须等到另一个线程调用take操作,否则put操作一直处于阻塞状态。

Worker

这个是ThreadPoolExecutor的一个内部类,表示一个工作线程。重要的是这个内部类实现了AbstractQueuedSynchronizer(AQS:抽象队列同步器)抽象类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** 当前work持有的线程 */
final Thread thread;
/** 运行的初始任务。 可能为空。*/
Runnable firstTask;
/** 每个线程完成任务的计数器 */
volatile long completedTasks;

/**
* 构造函数
*/
Worker(Runnable firstTask) {
// 禁止中断,直到runWorker
setState(-1);
//想提交的任务交给当前工作线程
this.firstTask = firstTask;
//通过线程工厂创建一个新的线程
this.thread = getThreadFactory().newThread(this);
}

/** 将run方法的执行委托给外部runWorker */
public void run() {
runWorker(this);
}

// 是否锁定
//
// 0代表解锁状态。
// 1代表锁定状态。

protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁(重写AQS的方法)
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁(重写AQS的方法)
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//加锁
public void lock() { acquire(1); }
//尝试加锁
public boolean tryLock() { return tryAcquire(1); }
//解锁
public void unlock() { release(1); }
//是否锁定
public boolean isLocked() { return isHeldExclusively(); }
//如果启动则中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWorker

最后来看下runWorker这个方法(ThreadPoolExecutor中的方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

下面是对注释的蹩脚翻译,欢迎吐槽,但注意尺度,O(∩_∩)O哈哈~

主要工作循环运行。重复地从队列中获取任务并执行它们,同时处理一些问题:

  • 我们可能会从最初的任务开始,在这种情况下,我们不需要得到第一个任务。否则,只要池正在运行,我们就从getTask获得任务。 如果它返回null,则由于更改池状态或配置参数而导致worker退出。其他退出的结果是在外部代码中抛出的异常,在这种情况下completeAbruptly成立,这通常会导致processWorkerExit来取代这个线程。
  • 在运行任何任务之前,获取锁以防止任务正在执行时发生其他池中断,调用clearInterruptsForTaskRun确保除非池正在停止,则此线程没有设置其中断。
  • 每个任务运行之前都会调用beforeExecute,这可能会引发一个异常,在这种情况下,我们会导致线程死亡(断开循环completeAbruptly为true),而不处理任务。
  • 假设beforeExecute正常完成,我们运行任务,收集任何抛出的异常发送到afterExecute。 我们分别处理RuntimeException,Error(这两个规范保证我们陷阱)和任意的Throwables。 因为我们不能在Runnable.run中重新抛出Throwable,所以我们把它们封装在Errors中(到线程的UncaughtExceptionHandler)。 任何抛出的异常也保守地导致线程死亡。
  • task.run完成后,我们调用afterExecute,这也可能会抛出一个异常,这也会导致线程死亡。 根据JLS Sec 14.20,即使task.run抛出,这个异常也是有效的。

异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler拥有关于用户代码遇到的任何问题的准确信息。

总结

本文是 JUC 的第二篇,意在通过查看源码来了解线程池的具体工作原理。文中如果存在不当的描述,希望小伙伴们能够及时提出。灰常感谢!

作者

卫恒

发布于

2018-11-11

更新于

2022-04-23

许可协议

评论