基于jdk8的线程池分析

2021/04/29 760点热度 0人点赞 0条评论

这可能是最简短的线程池分析文章了。

顶层设计,定义执行接口

Interface Executor(){
void execute(Runnable command);

}

ExecutorService,定义控制接口

interface ExecutorService extends Executor{

}

图片

抽象实现ExecutorService中的大部分方法

abstract class AbstractExecutorService implements ExecutorService{
//此处只有提交
}

图片

ThreadPoolExecutor线程池实现

ThreadPoolExecutor extends AbstractExecutorService{
//保存所有的执行线程(worker
HashSet<Worker> workers = new HashSet<Worker>();
//存放待执行的任务,这块具体由指定的队列实现
BlockingQueue<Runnable> workQueue;
//所有的submit内部最后都走这个execute实现
public void execute(Runnable command) {
//这里有两块操作,能添加worker就添加worker
//核心线程数不够,队列满了,未达到最大线程数
addWorker(command,x);
//核心线程数满了,先往队列里加,加不上才创建worker
workQueue.offer(command)
//无法添加且无法加入队列,就直接拒绝
}
//添加执行worker
private boolean addWorker(Runnable firstTask, boolean core) {
//这里每次都会基础校验和cas校验,防止并发无法创建线程,
retry:
for(;;){
for(;;){
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
try{
//创建一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
try{
//加锁校验,添加到workers集合中
workers.add(w);
}
//添加成功,将对应的线程启动,执行任务
t.start();
}finally{
//失败执行进行释放资源
addWorkerFailed(Worker w)
}


}
//Worker 是对任务和线程的封装
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
//线程启动后会循环执行任务
public void run() {
runWorker(this);
}
}
//循环执行
final void runWorker(Worker w) {
try{
while (task != null || (task = getTask()) != null) {
//执行前的可扩展点
beforeExecute(wt, task);
try{
//执行任务
task.run();
}finally{
//执行后的可扩展点,这块也把异常给吃了
afterExecute(task, thrown);
}
}
//这里会对执行的任务进行统计
}finally{
//异常或者是循环退出都会走这里
processWorkerExit(w, completedAbruptly);
}
}
//获取执行任务,此处决定runWorker的状态
private Runnable getTask() {
//worker的淘汰策略:允许超时或者工作线程>核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//满足淘汰策略且...,就返回null,交由processWorkerExit去处理线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 满足淘汰策略,就等一定的时间poll(),不满足,就一直等待take()
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
}
//处理任务退出(循环获取不到任务的时候)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//异常退出的,不能调整线程数的
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

//不管成功或失败,都执行以下逻辑
//1,计数,2,减去一个线程
completedTaskCount += w.completedTasks;
workers.remove(w);
//如果满足一定的条件,还会添加worker
addWorker(null, false);
}

}

yxkong

这个人很懒,什么都没留下

文章评论