什么是线程池
Thread Pool
是一种基于池化思想管理线程的工具,经常出现在多线程程序中。
线程池的优点:
降低资源消耗:通过池化技术重复利用已创建线程。
提高响应速度:任务到达时,无需等待进程创建即可执行。
提高线程的可管理性:使用线程池进行统一的分配、调优和监控。
提供更多强大的功能:线程池具备可扩展性,允许开发人员向其中增加更多功能。
为什么用线程池
直接创建线程存在性能开销:
Java中线程是基于内核线程实现的,线程的创建和销毁需要进行系统调用,性能开销较高。
Java8中,每个Thread
都需要有一个内核线程的支持,这意味着每个Thread
都需要消耗一定的内核资源。Java8中每个线程栈大小是1M,Java11中,对创建线程操作进行优化,创建一个线程只需要40KB。
线程切换引起context switch
。
使用线程池解决的核心问题就是资源管理问题 ,多线程环境下,不确定性会带来一些问题:频繁申请/销毁线程会带来额外的开销、存在资源耗尽的风险等。
使用池化思想将资源统一在一起管理的一种思想,可以最大化收益最小化风险,
ThreadPoolExecutor
继承关系
ThreadPoolExecutor
类的继承关系如下:
ThreadPoolExecutor继承关系
Executor:顶层的Executor
仅提供一个execute()
接口,实现了提交任务与执行任务的解耦。
ExecutorService:继承自Executor
,实现了添加了其他接口,例如:
为一个或一批异步任务生成Future的方法
提供了管控线程池的方法,例如停止线程池运行。
AbstractExecutorService:实现了ExecutorService
,实现了除execute()
以外的所有方法,将最重要的execute()
交给ThreadPoolExecutor
实现。
运行机制
ThreadPoolExecutor
的基本运行机制如下图所示(图片来源:美团技术团队):
ThreadPoolExecutor
线程池内部相当于一个生产者消费者模型,将线程池分成两个部分:任务管理、线程管理。
任务管理相当于生产者,任务提交后,线程池判断该任务的后续操作。
直接申请线程执行该任务
存放到阻塞队列中等待
拒绝该任务。
线程管理部分是消费者,根据任务请求进行线程分配工作,当线程执行完任务后会继续获取新的任务去执行,最终当线程获取不到任务时,线程会进行回收。
以下三个问题需要解决:
线程池如何维护自身状态?
线程池如何管理任务?
线程池如何管理线程?
构造方法
核心的构造方法如下,主要参数有:
corePoolSize :核心线程数量
maximumPoolSize :最大线程数量
workQueue :保存等待执行任务的阻塞队列,当提交一个新的任务到线程池时,线程池根据当前状态决定后续处理。
keepAliveTime :线程池维护线程所允许的时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime。
threadFactory :它是ThreadFactory
类型的变量,用来创建新线程。
handler :它是RejectedExecutionHandler
类型的变量,表示线程池的拒绝策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
线程池的状态
线程池的运行状态,由AtomicInteger ctl
维护,其中分为两个参数:runState
和workerCount
,高3位存储runState
,低29位存储workerCount
,提供了位运算的方法来获取对应的参数。线程池的运行状态,通过内部进行调整。不需要套
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
RUNNING
能接受新提交的任务,并且能处理阻塞队列中的任务
SHUTDOWN
关闭状态,不再接受新提交的任务,但是可以继续处理阻塞队列中的任务。
STOP
不能接受新的任务,也不处理队列中的任务,会中断正在处理任务的线程。
TIDYING
如果所有的任务都终止了,workerCount
位0,线程池会调用terminated()
进入TERMINATED状态。
TERMINATED
执行完terminated()
方法后进入该状态。
线程池的状态转换过程如下图所示:
线程池状态转换过程
任务调度机制
用户提交一个任务给线程池,线程池如何进行调度,如何对任务进行管理。这是这部分的核心问题。
调度工作是由execute
方法完成,由此可以看出该方法的重要性。
其基本执行过程如下:
首先检查线程池运行状态,如果不是RUNNING
,则直接拒绝。
如果workerCount <
corePoolSize,则创建并启动一个线程来执行新任务。
如果workerCount >=
corePoolSize,且线程池内的阻塞队列未满,则将任务添加到阻塞队列中。
如果workerCount >= corePoolSize && workCount <
maximumPoolSize,且线程池内阻塞队列已满,则创建并启动一个线程来执行新任务
如果workerCount >=
maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
其执行流程如下图所示:
任务调度流程
addWorker
该方法主要是在线程池中新建一个worker
线程并执行任务,两个参数的含义如下:
firstTask
:执行新增的线程执行的第一个任务
core
:检测标识
true
:新增线程时会判断当前活动线程是否小于corePoolSize
false
:新增线程时会判断当前活动线程是否少于maximumPoolSize
方法的具体含义参考代码注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
拒绝策略
线程池提供了如下四种策略:
AbortPolicy :直接抛出异常,这是默认策略。
CallerRunsPolicy :用调用者所在的线程来执行任务。
DiscardOldestPolicy :丢弃阻塞队列中靠最前的任务,并执行当前任务。
DiscardPolicy :直接丢弃任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
任务阻塞机制
任务阻塞机制是线程池管理任务的核心机制。线程池将任务和线程进行解耦,以生产者消费者模式,通过一个阻塞队列来实现的。任务缓存到阻塞队列,线程从阻塞队列中获取任务。
Worker
Q:线程池如何管理进程?
线程池中每一个线程被封装成Worker
对象,通过对Worker
对象的管理,从而达到对进程管理的目的。下面这个HashSet
存储的是Worker
集合。
1 private final HashSet<Worker> workers = new HashSet <Worker>();
Worker
继承了AQS
,使用AQS
实现了独占锁的功能,从tryAcquire
可以看出禁止重入。其主要含义如下:
独占状态:表示当前线程正在执行任务,则不应该中断线程。
空闲状态:没有在处理任务,可以对线程中断。
其它注意点:
线程池在执行shutdown
方法或tryTerminate
方法时会调用interruptIdleWorkers
方法来中断空闲的线程,interruptIdleWorkers
方法会使用tryLock
方法来判断线程池中的线程是否是空闲状态;
设置成不可重入的原因:不希望任务在运行时重新获得锁,从而调用一些会中断运行时线程的方法。
Worker
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker
在Worker
类中的run()
调用该方法来执行任务,主要逻辑如下:
while循环通过getTask()
获取任务。
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态(因为需要继续运行)。
调用task.run()
运行任务
如果阻塞队列中没有任务可执行则跳出循环,执行processWorkerExit()
其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
参考
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html