线程的创建和销毁都需要映射到操作系统,因此其代价是比较高昂的。出于避免频繁创建、销毁线程以及方便线程管理的需要,线程池应运而生。
在项目工程中,基于池化思想的技术应用很多,例如基于线程池的任务并发执行,中间件服务的连接池配置,通过对共享资源的管理,降低资源的占用消耗,提升效率和服务性能。
线程池
线程池
在内存中频繁的创建和销毁对象是很影响性能的,而线程作为进程中运行的基本单位,通过线程池的方式重复使用已创建的线程,在任务执行动作上避免或减少线程的频繁创建动作。
线程池中维护多个线程,当收到调度任务时可以避免创建线程直接执行,并以此降低服务资源的消耗,把相对不确定的并发任务管理在相对确定的线程池中,提高系统服务的稳定性。
原理
Executor 接口
public interface Executor {
void execute(Runnable command);
}
将来会执行命令,任务提交和执行两个动作会被解耦,传入Runnable任务对象即可,线程池会执行相应调度和任务处理。Executor虽然是ThreadPoolExecutor线程池的顶层接口,但是其本身只是抽象了任务的处理思想。
ExecutorService 接口
扩展Executor接口,单个或批量的给任务的执行结果生成Future,并增添任务中断或终止的管理方法。
package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService 抽象类
提供对ExecutorService接口定义的任务执行方法(submit,invokeAll)等默认实现,提供newTaskFor方法用于构建RunnableFuture对象。
ThreadPoolExecutor 类
维护线程池生命周期,管理线程和任务,通过相应调度机制实现任务的并发执行。
ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
}
生命周期
这里从源码开始逐步分析线程池的核心逻辑,首先看看对于生命周期的状态描述,涉及如下几个核心字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 状态描述
// ctl高3位为111 接受新的任务并处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// ctl高3位为000 不接受新任务但是处理阻塞队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// ctl高3位为001 不接受新任务也不处理阻塞队列中的任务并且中断所有线程池中正在运行的任务
private static final int STOP = 1 << COUNT_BITS;
// ctl高3位为010 所有任务都被终止,工作线程为0
private static final int TIDYING = 2 << COUNT_BITS;
// ctl高3位为011 不接受新任务也不处理阻塞队列中的任务并且中断所有线程池中正在运行的任务
private static final int TERMINATED = 3 << COUNT_BITS;
//在某些情况下用来存储任务,并将任务提供给线程池中的工作线程
private final BlockingQueue<Runnable> workQueue;
//用来对pooSize、corePoolSize、maximumPoolSize、runState、workers修改时候同步
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中所有线程的集合,访问和修改需要mainLock的配合
private final HashSet<Worker> workers = new HashSet<Worker>();
//用来支持waitTemination
private final Condition termination = mainLock.newCondition();
//跟踪线程池中线程的最大值,具体的猜测是为了矫正poolsize,访问和修改需要配合mainLock
private int largestPoolSize;
//已完成任务的数量,在任务处于Terminate状态时才更新,访问和修改需要mainLock的配合
private long completedTaskCount;
//线程工厂,用户可以自定义,以便在想线程池创建线程时附加一些个人操作
private volatile ThreadFactory threadFactory;
//当线程池处于shutdown或者处于饱和时执行的拒绝策略
private volatile RejectedExecutionHandler handler;
//设置线程池中空闲线程等待多时毫秒被回收
private volatile long keepAliveTime;
//指定线程池中的空闲线程是否一段时间被回收,false一直存活
private volatile boolean allowCoreThreadTimeOut;
//核心线程池大小,若allowCoreThreadTimeOut被设置,全部空闲超时被回收的情况下会为0
private volatile int corePoolSize;
//最大线程池大小,不得超过CAPACITY
private volatile int maximumPoolSize;
ctl
控制线程池的状态,包含两个概念字段:workerCount
线程池内有效线程数,runState
运行状态,具体的运行有5种状态描述:
- RUNNING:接受新任务,处理阻塞队列中的任务;
- SHUTDOWN:不接受新任务,处理阻塞队列中已存在的任务;
- STOP:不接受新任务,不处理阻塞队列中的任务,中断正在进行的任务;
- TIDYING:所有任务都已终止,workerCount=0,线程池进入该状态后会执行
terminated()
方法; - TERMINATED: 执行
terminated()
方法完后进入该状态;
线程池总共有5种状态,用int值的高3位来表示线程状态。剩下29个就可以表示线程数量(所以线程数量最大值就是29位上全是1)。
所以ctl的值为:
private static int ctlOf(int rs, int wc) { return rs | wc; }
那么又是读取线程状态和数量的值呢,以下参数c即ctl:
读取状态利用以下方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
读取线程数量利用以下方法:
private static int workerCountOf(int c) { return c & CAPACITY; }
CAPACITY的值为(00011111 11111111 11111111 11111111);
状态之间的转换逻辑如下:
任务调度
调度逻辑
从整体上看,任务调度被放在三个分支步骤中判断,即:核心线程池、任务队列、拒绝策略,下面再细看每个分支的处理逻辑;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 上文描述的workerCount与runState
int c = ctl.get();
// 核心线程池
if (workerCountOf(c) < corePoolSize) {
// 如果有效线程数小于核心线程数,新建线程并绑定当前任务
if (addWorker(command, true))
return;
/*
添加失败(可能是线程池状态是SHUTDOWN或以上的状态(SHUTDOWN状态下不再接收
新任务),也可能是线程数超过阈值了),就重新获取一下ctl的值,走下面的逻辑
*/
c = ctl.get();
}
// 任务队列
/*
走到这里说明当前线程数大于等于核心线程数,又或者是上面添加核心线程失败中解释的情况
此时就判断一下当前线程池是否是RUNNING状态,如果是的话就往阻塞队列入队
这里offer跟put的区别是如果队列已满,offer不会被阻塞,而是立即返回false
*/
// 如果线程池是运行状态,并且任务添加队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 二次校验如果是非运行状态,则移除该任务,执行拒绝策略
int recheck = ctl.get();
/*
这里会再次检查一次当前线程池是否是RUNNING状态,可能此时线程池已经shutdown了
如果不是RUNNING状态,就删除上面入队的任务,并执行相应的拒绝策略
*/
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果有效线程数是0,执行addWorker添加方法
/*
此时还会去判断一下是否当前的工作线程数已经为0了(可能这些线程在上次workerCountOf
检查后(第10行代码处)被销毁了(allowCoreThreadTimeOut设置为true)),如果是
的话就新创建一个空任务的非核心线程。注意,这里传进addWorker方法的是空任务,因为任务
已经在阻塞队列中存在了,所以这个Worker执行的时候,会直接从阻塞队列中取出任务来执行
所以说这里的意义也就是要保证线程池在RUNNING状态下必须要有一个线程来执行任务
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 拒绝策略
// 再次执行addWorker方法,如果失败则拒绝该任务
/*
走到这里说明线程池不是RUNNING状态,或者阻塞队列已满,此时创建一个非核心线程去执行
如果创建失败,说明线程池的状态已经不是RUNNING了,又或者当前线程数已经大于等于最大线程数了
那么就执行相应的拒绝策略
*/
else if (!addWorker(command, false))
reject(command);
}
Worker线程
线程池内工作线程被封装在Worker类中,继承AQS并实现Runnable接口,维护线程的创建和任务的执行:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 持有线程
final Thread thread;
// 初始化任务
Runnable firstTask;
addWorker(Runnable firstTask, boolean core)
方法,顾名思义,向线程池添加一个带有任务的工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层循环:判断线程池状态
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 1.线程池为非Running状态(Running状态则既可以新增核心线程也可以接受任务)
* 2.线程为shutdown状态且firstTask为空且队列不为空
* 3.满足条件1且条件2不满足,则返回false
* 4.条件2解读:线程池为shutdown状态时且任务队列不为空时,可以新增空任务的线程来处理队列中的任务
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环:线程池添加核心线程并返回是否添加成功的结果
for (;;) {
int wc = workerCountOf(c);
// 校验线程池已有线程数量是否超限:
// 1.线程池最大上限CAPACITY
// 2.corePoolSize或maximumPoolSize(取决于入参core)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通过CAS操作使工作线程数+1,跳出外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 线程+1失败,重读ctl
c = ctl.get(); // Re-read ctl
// 如果此时线程池状态不再是running,则重新进行外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/**
* 核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 下面代码需要加锁:线程池主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 持锁期间重新检查,线程工厂创建线程失败或获取锁之前关闭的情况发生时,退出
int rs = runStateOf(ctl.get());
// 再次检验线程池是否是running状态或线程池shutdown但线程任务为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程已经启动,则抛出非法线程状态异常
// 为什么会存在这种状态呢?未解决
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加入线程池
workers.add(w);
int s = workers.size();
// 如果当前工作线程数超过线程池曾经出现过的最大线程数,刷新后者值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 工作线程添加成功,启动该线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//线程启动失败,则进入addWorkerFailed
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
getTask()
方法
getTask()
方法是工作线程在while
死循环中获取任务队列中的任务对象的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// allow interrupts
// new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
w.unlock();
// 线程退出的原因,true是任务导致,false是线程正常退出
boolean completedAbruptly = true;
try {
// 当前任务和从任务队列中获取的任务都为空,方停止循环
while (task != null || (task = getTask()) != null) {
//上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/**
* 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
* 条件1:线程池状态>=STOP,即STOP或TERMINATED
* 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED),
* 条件1与条件2任意满意一个,且wt不是中断状态,则中断wt,否则进入下一步
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //当前线程调用interrupt()中断
try {
//执行前(空方法,由子类重写实现)
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
}
catch (RuntimeException x) {
thrown = x; throw x;
}
catch (Error x) {
thrown = x; throw x;
}
catch (Throwable x) {
thrown = x; throw new Error(x);
}
finally {
//执行后(空方法,由子类重写实现)
afterExecute(task, thrown);
}
}
finally {
task = null;
w.completedTasks++; //完成任务数+1
w.unlock(); //释放锁
}
}
//
completedAbruptly = false;
}
finally {
//处理worker的退出
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
// 记录上一次从队列中拉取的时候是否超时
boolean timedOut = false; // Did the last poll() time out?
// 注意这是死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑):
// 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法
// 2. 任务队列为空
// 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed临时变量用于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
// 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
// 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 第二个if:
// 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,是通过setMaximumPoolSize()方法减少了线程池容量
// 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
// 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,
// CAS把线程数减去1失败会进入下一轮循环做重试
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit(Worker w, boolean completedAbruptly)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1.工作线程-1操作
* 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1
* 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 2.从线程set集合中移除工作线程,该过程需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将该worker已完成的任务数追加到线程池已完成的任务数
completedTaskCount += w.completedTasks;
// HashSet<Worker>中移除该worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 3.根据线程池状态进行判断是否结束线程池
tryTerminate();
/**
* 4.是否需要增加工作线程
* 线程池状态是running 或 shutdown
* 如果当前线程是突然终止的,addWorker()
* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
* 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
*/
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
核心参数7个
corePoolSize:核心线程数。
maximumPoolSize:最大线程数。
keepAliveTime:空闲线程存活时间。
当允许核心线程超时,也就是allowCoreThreadTimeOut
设置为true的时候,此时keepAliveTime
表示空闲的工作线程的存活周期。
默认情况下不允许核心线程超时,此时keepAliveTime
表示空闲的非核心线程的存活周期。
TimeUnit:时间单位。
BlockingQueue:线程池任务队列。
ThreadFactory:创建线程的工厂。
RejectedExecutionHandler:拒绝策略。
defaultThreadFactory默认线程工厂
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
默认拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
ThreadPoolExecutor
提供了动态调整线程池容量大小的方法:
- 「setCorePoolSize」:设置核心池大小
- 「setMaximumPoolSize」:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor
进行线程赋值,还可能立即创建新的线程来执行任务。
拒绝策略:当线程池的任务超出线程池队列可以存储的最大值之后,执行的策略。默认的拒绝策略有以下 4 种:
- AbortPolicy:拒绝并抛出异常。
- CallerRunsPolicy:使用当前调用的线程来执行此任务。
- DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
- DiscardPolicy:忽略并抛弃当前任务。
线程池如何配置合理参数
线程数
(1)CPU密集型:
定义:CPU密集型的意思就是该任务需要大量运算,而没有阻塞,CPU一直全速运行。
CPU密集型任务只有在真正的多核CPU上才可能得到加速(通过多线程)。
CPU密集型任务配置尽可能少的线程数。
CPU密集型线程数配置公式:(CPU核数+1)个线程的线程池
(2)IO密集型:
定义:IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运行IO密集型任务会导致浪费大量的CPU运算能力浪费在等待。
所以IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要利用了被浪费掉的阻塞时间。
第一种配置方式:
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程。
配置公式:CPU核数 * 2。
第二种配置方式:
IO密集型时,大部分线程都阻塞,故需要多配置线程数。
配置公式:CPU核数 / (1 – 阻塞系数)(0.8~0.9之间)
比如:8核 / (1 – 0.9) = 80个线程数
其余思路:
核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统百分之80的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10;
任务队列长度(workQueue)
任务队列长度一般设计为:核心线程数/单个任务执行时间*2
即可;例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200;
最大线程数
最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定:例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)*单个任务执行时间
;既: 最大线程数=(1000-200)*0.1=80个
;
最大空闲时间(keepAliveTime)
此参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,可以根据经验和系统产生任务的时间间隔合理设置一个值即可;
引用