原文:https://juejin.cn/post/6844903560899985415
ThreadPoolExecutor算是JUC中最常用的类之一了。ThreadPoolExecutor,顾名思义,thread-pool-executor,硬翻译就是“线程-池-执行者”;java中,通过ThreadPoolExecutor可以很容易的创建一个线程池。但是我们为什么要使用线程池?呢?它能够带来什么样的优势呢?它又是怎么实现的呢?OK,带着这几个问题,我们来学习一下JAVA中的线程池技术。
为什么要使用线程池? 关于这个问题其实有点鸡肋,我觉得再问这个问题之前更应该问为什么要有线程池。那为什么呢?
this is a 例子:
快递行业最近两年发展的灰常火热,听说工资也非常的高,搞得我一天天的都没有心思去好好写代码了...
之前的小快递公司都是没有固定的快递员的,就是说,每次去送一件快递,站点负责人就需要去找一个人来帮忙送,送完之后就没有然后了(当然,钱还是要给的)。
但是后来随着货越来越多,找人给钱成本太大,而且农忙时还需要花很长时间去找人,所以就雇用了5个人,签了合同,长期为站点配送。
以前都是随时用随时找,现在不是,现在是成立了一个物流公司,开了一个配送部,配送部门规定正式配送员最多只能有五个人。
之前配送的缺点是什么:
每次有货,我都会去临时找一个人,然后签订临时合同,送完之后解除合同。很麻烦。 这也是不用线程池的缺点,就是任务来了,我们需要频繁的去创建新的线程,用完之后还需要释放线程资源,对于系统的消耗是很大的。
因为配送的货车只有那么几个,如果临时签订的人多了,车子不够用,其他人只能等着车子送完之后才能用。
成立配送部之后解决的问题
成立配送部之后呢,因为签订的是劳务合同,我们可以重复的让配送员配送不同的货物。达到线程资源的复用。
因为限定了最多招聘的人数,可以很好的避免招过多无用的人。
OK,我们以上述例子来对应理解线程池的基本原理
先来看下,JAVA对ThreadPoolExecutor的类申明:
1 public class ThreadPoolExecutor extends AbstractExecutorService
在【初识】-JUC·Executor框架 中给出了Executor的继承体系。ThreadPoolExecutor就是具备线程池功能的集成者。
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
从上面的代码可以看出,构造方法(一、二、三)都是通过调用(四)来做具体属性初始化的。那么我们直接来看构造方法四;在构造方法四中总共需要7个参数,先来看下每个参数的具体含义:
corePoolSize
核心线程数大小。那么什么是核心线程数呢,我们可以类比于上面例子中的配送部中签订劳动合同的人的个数。
maximumPoolSize
最大线程数。加入说现在是双十一期间,快递异常的多,配送部的5个人完全忙不过来,而且仓库也满了,怎么办呢?这个时候就需要再招聘一些临时配送员,假设maximumPoolSize为10,那么也就是说,临时招聘可以招5个人,配送部签订正式劳动合同的人和签订临时合同的人加一块不能超过配送部规定的最大人数(10人)。所以说,maximumPoolSize就是线程池能够允许的存在的最大线程的数量。
keepAliveTime
存活时间。为什么要有这个呢?想一下,双十一过去了,货物已经配送的差不多了。临时合同写的是如果临时配送员2天没有配送了,那配送部就有权利终止临时合同,现在已经达到2天这个点了,需要开除这些临时配送专员了。对于线程池来说,keepAliveTime就是用来表示,当除核心线程池之外的线程超过keepAliveTime时间之后,就需要被系统回收了。
unit
keepAliveTime的时间单位。
workQueue
工作队列。这个就相当于一个仓库,现在配送部5个人都在配送,但是还不断的有新的快递达到,这个时候就需要一个仓库来存放这些快递。对于线程池来说,当核心线程都有自己的任务处理,并且还有任务进来的时候,就会将任务添加到工作队列中去。
threadFactory
线程工厂。就是用来创建线程的。可以类比成招聘组,会给每个线程分配名字或者编号这样。
handler
RejectedExecutionHandler 用来描述拒绝策略的。假设现在我的仓库也满足,并且配送部已经达到10个人了。怎么办呢,那么只能采用一些策略来拒绝任务了。
线程池的状态 1 2 3 4 5 6 7 8 9 10 11 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
下面是在网上发现的一位大牛的图;感觉可以较为直观的描述状态的变更
工作原理
有几个点需要注意。
1、如何提交一个任务到线程池? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
如果少于corePoolSize线程正在运行,请尝试使用给定命令启动一个新线程作为其第一个任务。 对addWorker的调用会自动检查runState和workerCount,从而防止错误报警,在不应该的时候通过返回false来添加线程。
如果一个任务能够成功排队,那么我们仍然需要再次检查是否应该添加一个线程(因为现有的线程自上次检查以来已经死掉)或者自从进入这个方法以来,池关闭了。所以我们重新检查状态,如果当前command已经stop了,那么就退出工作队列,如果没有的话就开始一个新的线程。
如果队列满了,会想尝试去创建一个新的线程去执行,如果创建不了,那就执行拒绝策略。
2、如何创建一个线程去处理任务? 通过实现这个接口去创建一个新的线程
1 2 3 public interface ThreadFactory { Thread newThread (Runnable r) ; }
3、如何将任务添加到队列? 通过addWorker方法来添加,其实在excute中只是作为一个提交任务的入口,实际的处理逻辑都是在addWorker这个方法里来完成的。addWorker有两个参数:
firstTask 当前任务
core 用来标注当前需要创建的线程是否是核心线程,如果core为true,则表明创建的是核心线程,也就是说当前还没有达到最大核心线程数。
先来看下这个方法的前半部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }
//如果超过CAPACITY限制了则直接返回false
//判断当前的workerCount是否大于corePoolsize,否则则判断是否大于maximumPoolSize //具体的比较取决于入参core是true还是false。
1 wc >= (core ? corePoolSize : maximumPoolSize)
如果上面两个有一个满足了,则直接返回false。
下面是判断WorkerCount通过CAS操作增加1是否成功,成功的话就到此结束
1 2 if (compareAndIncrementWorkerCount(c)) break retry;
如果不成功,则再次判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断。
1 2 3 c = ctl.get(); if (runStateOf(c) != rs) continue retry;
再来看下这个方法的后面半个部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 boolean workerStarted = false ;boolean workerAdded = false ;Worker w = null ;try { final ReentrantLock mainLock = this .mainLock; w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}
拒绝策略有哪些?
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:使用调用者自己的当前线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然我们也可以自定义拒绝策略。
常用工作队列类型 1、ArrayBlockingQueue
基于数组的阻塞队列,长度有限
2、LinkedBlockingQuene
基于链表的阻塞队列,长度无限,使用这个可能会导致我们的拒绝策略失效。因为可以无限的创建新的工作线程。
3、PriorityBlockingQueue
具有优先级的无界阻塞队列;
3、SynchronousQuene
SynchronousQuene是一个是一个不存储元素的BlockingQueue;每一个put操作必须要等待一个take操作,否则不能继续添加元素。所以这个比较特殊,它不存我们的任务,也就说说它的每个put操作必须等到另一个线程调用take操作,否则put操作一直处于阻塞状态。
Worker 这个是ThreadPoolExecutor的一个内部类,表示一个工作线程。重要的是这个内部类实现了AbstractQueuedSynchronizer(AQS:抽象队列同步器)抽象类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker 最后来看下runWorker这个方法(ThreadPoolExecutor中的方法):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
下面是对注释的蹩脚翻译,欢迎吐槽,但注意尺度,O(∩_∩)O哈哈~
主要工作循环运行。重复地从队列中获取任务并执行它们,同时处理一些问题:
我们可能会从最初的任务开始,在这种情况下,我们不需要得到第一个任务。否则,只要池正在运行,我们就从getTask获得任务。 如果它返回null,则由于更改池状态或配置参数而导致worker退出。其他退出的结果是在外部代码中抛出的异常,在这种情况下completeAbruptly成立,这通常会导致processWorkerExit来取代这个线程。
在运行任何任务之前,获取锁以防止任务正在执行时发生其他池中断,调用clearInterruptsForTaskRun确保除非池正在停止,则此线程没有设置其中断。
每个任务运行之前都会调用beforeExecute,这可能会引发一个异常,在这种情况下,我们会导致线程死亡(断开循环completeAbruptly为true),而不处理任务。
假设beforeExecute正常完成,我们运行任务,收集任何抛出的异常发送到afterExecute。 我们分别处理RuntimeException,Error(这两个规范保证我们陷阱)和任意的Throwables。 因为我们不能在Runnable.run中重新抛出Throwable,所以我们把它们封装在Errors中(到线程的UncaughtExceptionHandler)。 任何抛出的异常也保守地导致线程死亡。
task.run完成后,我们调用afterExecute,这也可能会抛出一个异常,这也会导致线程死亡。 根据JLS Sec 14.20,即使task.run抛出,这个异常也是有效的。
异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler拥有关于用户代码遇到的任何问题的准确信息。
总结 本文是 JUC 的第二篇,意在通过查看源码来了解线程池的具体工作原理。文中如果存在不当的描述,希望小伙伴们能够及时提出。灰常感谢!