測(cè)試代碼
/**
* @program: springbootclient2
* @description: 自定義線程池,擴(kuò)展任務(wù)
* @create: 2020-02-27 10:46
*/
@Slf4j
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//裝飾線程池中任務(wù)
@Override
protected void beforeExecute(Thread t, Runnable r) {
//打印線程信息(當(dāng)前線程是什么扮碧?)
log.info("執(zhí)行前的方法..." + Thread.currentThread());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
log.info("執(zhí)行后的方法..." + Thread.currentThread());
}
}
/**
* @program: springbootclient2
* @description: 探索Java線程池的實(shí)現(xiàn)原理
* @create: 2020-02-26 10:33
*/
@Slf4j
public class TestThreadPoolExecutor {
public static void main(String[] args) {
//創(chuàng)建了一個(gè)線程池
ThreadPoolExecutor pool =
new MyThreadPoolExecutor(1, 3,
200, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
//打印日志(核心線程執(zhí)行)
pool.execute(() -> {
try {
Thread.sleep(1000 * 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("日志打印...哈哈哈");
});
//再次打印日志(阻塞隊(duì)列執(zhí)行)
pool.execute(() -> {
log.info("日志再打印...");
});
//再次打印日志(最大線程執(zhí)行)
pool.execute(() -> {
log.info("日志再打印...");
});
}
}
實(shí)現(xiàn)原理
用戶向線程池提交一個(gè)任務(wù)(實(shí)現(xiàn)Runnable接口)后
- 若小于核心線程數(shù),那么直接開啟一個(gè)線程執(zhí)行;
- 若大于核心線程數(shù)浩淘,則將任務(wù)放入阻塞隊(duì)列中;
- 若阻塞隊(duì)列滿了吴攒,則會(huì)使用最大線程數(shù)张抄,繼續(xù)開啟線程執(zhí)行任務(wù);
- 若最大線程數(shù)滿了舶斧,那么采取拒絕策略去拒絕任務(wù)欣鳖;
而執(zhí)行任務(wù)的流程:創(chuàng)建Worker線程,將傳入的任務(wù)交給Worker線程去執(zhí)行茴厉,而Worker線程是ThreadFactory產(chǎn)生的泽台,若傳入的任務(wù)為null,則會(huì)通過getTask()
去workQueue中獲取任務(wù)去執(zhí)行矾缓。
源碼中的細(xì)節(jié):
- 維護(hù)了一個(gè)AtomicInteger的ctl怀酷,前3位標(biāo)識(shí)線程池的狀態(tài),后29位表示線程池中線程的數(shù)量嗜闻。使用CAS來修改參數(shù)蜕依;
- Worker線程使用裝飾器的模式增強(qiáng)傳入的任務(wù),并預(yù)留兩個(gè)鉤子方法擴(kuò)展。
- 使用HashSet來保存Worker線程样眠;
- 核心線程數(shù)存活時(shí)間的實(shí)現(xiàn):使用CAS+自旋友瘤,首先去隊(duì)列中獲取元素,若超時(shí)后檐束,修改標(biāo)識(shí)位辫秧。再次循環(huán),若線程池中存在線程且任務(wù)隊(duì)列中沒有任務(wù)被丧,那么通過CAS修改ctl中當(dāng)前線程的數(shù)量盟戏,跳出方法去銷毀線程。
源碼分析
ThreadPoolExecutor關(guān)鍵屬性
//存放當(dāng)前運(yùn)行的worker數(shù)量以及線程池狀態(tài)
//int是32位的甥桂,這里把int的高3位拿來充當(dāng)線程池狀態(tài)標(biāo)識(shí)位柿究,后29位拿來充當(dāng)當(dāng)前運(yùn)行的worker數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任務(wù)的阻塞隊(duì)列
private final BlockingQueue<Runnable> workQueue;
//存放工作線程(worker)的集合,用set來存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//歷史達(dá)到的worker數(shù)最大值
private int largestPoolSize;
//當(dāng)隊(duì)列滿了并且worker的數(shù)量達(dá)到maxSize的時(shí)候,執(zhí)行具體的拒絕策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存時(shí)間
private volatile long keepAliveTime;
//核心線程數(shù)量
private volatile int corePoolSize;
//最大線程的數(shù)量,一般當(dāng)workQueue滿了才會(huì)用到這個(gè)參數(shù)
private volatile int maximumPoolSize;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//workerCountOf(c)會(huì)獲取當(dāng)前正在運(yùn)行的worker數(shù)量黄选。
if (workerCountOf(c) < corePoolSize) {
//如果小于核心線程數(shù)蝇摸,創(chuàng)建一個(gè)worker然后直接執(zhí)行該任務(wù)。
if (addWorker(command, true))
return;
c = ctl.get();
}
//當(dāng)核心線程數(shù)滿的時(shí)候糕簿,會(huì)執(zhí)行該判斷探入,判斷線程池的狀態(tài)為運(yùn)行態(tài)時(shí)
//會(huì)將任務(wù)放入到queue中(若queue滿了,會(huì)返回false)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//使用最大線程去執(zhí)行懂诗,若執(zhí)行失敗蜂嗽,返回false,將執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
根據(jù)傳入的任務(wù)殃恒,創(chuàng)建worker工作線程植旧,并運(yùn)行:
//firstTask是傳入線程池的任務(wù),core是使用核心線程去執(zhí)行离唐,還是最大線程去執(zhí)行
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//先獲取線程池的狀態(tài)
int c = ctl.get();
//rs即線程池的狀態(tài)(ctl的前3位表示)
int rs = runStateOf(c);
// 如果線程池是關(guān)閉的(SHUTDOWN=0)病附,或者workQueue隊(duì)列非空,直接返回false亥鬓,跳出方法
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取worker的數(shù)量
int wc = workerCountOf(c);
//第一個(gè)判斷是:wc>=536870911
//根據(jù)入?yún)ore完沪,判斷當(dāng)前線程與核心線程數(shù)/最大線程數(shù)去比較。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試修改ctl的workerCount的值(+1)嵌戈,這里使用的是CAS覆积,如果失敗,繼續(xù)下一次重試熟呛,直到獲取成功為止宽档。
if (compareAndIncrementWorkerCount(c))
//如果設(shè)置成功就跳出外層的for循環(huán)
break retry;
//重讀一次ctl,判斷如果線程池的狀態(tài)改變庵朝,會(huì)重新循環(huán)一次吗冤。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//執(zhí)行到此處時(shí)又厉,線程的數(shù)量+1,但實(shí)際上未開啟新線程椎瘟,下面是創(chuàng)建新worker線程覆致。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創(chuàng)建一個(gè)worker,將提交上來的任務(wù)直接交給worker肺蔚。
w = new Worker(firstTask);
//獲取Worker線程中的thread對(duì)象篷朵。
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//新增Worker線程并存入HashSet中(加鎖,該動(dòng)作是串行畫的)
mainLock.lock();
try {
//獲取線程池的狀態(tài)
int rs = runStateOf(ctl.get());
//線程池沒有中斷或者線程池已經(jīng)中斷婆排,但是線程還持有任務(wù)均往下執(zhí)行(否則該方法不會(huì)做操作)。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //如果worker的線程線程已經(jīng)啟動(dòng)笔链,拋異常
throw new IllegalThreadStateException();
//添加新建的worker到HashSet<Worker>中
workers.add(w);
int s = workers.size();
//更新歷史worker數(shù)量的最大值(和Worker Set容量進(jìn)行對(duì)比)
if (s > largestPoolSize)
largestPoolSize = s;
//設(shè)置新增標(biāo)志位
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker是新增的段只,就啟動(dòng)該線程。
if (workerAdded) {
t.start();
//成功啟動(dòng)線程鉴扫,設(shè)置對(duì)應(yīng)的標(biāo)志位
workerStarted = true;
}
}
} finally {
//啟動(dòng)失敗赞枕,就會(huì)觸發(fā)相應(yīng)的方法。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2. Worker結(jié)構(gòu)
Worker是ThreadPoolExecutor內(nèi)部定義的一個(gè)內(nèi)部類
//實(shí)現(xiàn)了Runnable接口坪创,所以可以作為線程使用
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//運(yùn)行的線程炕婶,前面addWorker方法中就是直接通過啟動(dòng)這個(gè)線程來啟動(dòng)這個(gè)worker。
final Thread thread;
//當(dāng)一個(gè)worker剛創(chuàng)建時(shí)莱预,就會(huì)嘗試執(zhí)行這個(gè)任務(wù)
Runnable firstTask;
//記錄完成任務(wù)的數(shù)量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//創(chuàng)建一個(gè)Thread柠掂,將自己設(shè)置給他,后面這個(gè)thread啟動(dòng)的時(shí)候依沮,也就是執(zhí)行這個(gè)worker涯贞。
this.thread = getThreadFactory().newThread(this);
}
//使用裝飾器的模式去擴(kuò)展了run()方法。
public void run() {
//調(diào)用了ThreadPoolExecutor的runWorker方法危喉;
runWorker(this);
}
...
}
當(dāng)worker工作線程執(zhí)行run方法時(shí)宋渔,實(shí)際上會(huì)執(zhí)行該方法:
worker會(huì)判讀自己是否持有任務(wù),若未持有任務(wù)辜限,會(huì)通過getTask()方法去workQueue中獲取任務(wù)(Runnable任務(wù))
final void runWorker(Worker w) {
//獲取到當(dāng)前線程
Thread wt = Thread.currentThread();
//獲取到worker中的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
//執(zhí)行unlock方法皇拣,允許其他線程來中斷自己
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果前面firstTask有值,那么直接執(zhí)行這個(gè)任務(wù)薄嫡;
//如果沒有具體的任務(wù)氧急,就執(zhí)行g(shù)etTask()方法從隊(duì)列中獲取任務(wù);
while (task != null || (task = getTask()) != null) {
//執(zhí)行任務(wù)前先鎖住岂座,這里的作用就是給shutdown()判斷是否有worker在執(zhí)行中态蒂。
//shutdown方法會(huì)嘗試給線程加鎖,如果線程在執(zhí)行费什,就不會(huì)中斷它钾恢。
w.lock();
//詳見下面分析
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行任務(wù)前被調(diào)用手素,Spring預(yù)留的方法,可擴(kuò)展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//正常調(diào)用run()方法瘩蚪。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執(zhí)行任務(wù)后調(diào)用泉懦,預(yù)留的方法,可擴(kuò)展
afterExecute(task, thrown);
}
} finally {
task = null;
//記錄完成的任務(wù)數(shù)量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
線程的中斷:
- stop方法:是一個(gè)過時(shí)的方法疹瘦,不應(yīng)該在使用這種方法去中斷線程崩哩;
- isInterrupted方法:是Thread類的普通方法,會(huì)返回調(diào)用方法的線程類狀態(tài)言沐。
- interrupted方法:是Thread類的靜態(tài)方法邓嘹,返回的是調(diào)用方法的線程的狀態(tài),interrupted方法會(huì)清除線程的中斷狀態(tài)险胰。
- interrupt方法:用來中斷線程汹押,也就是將中斷標(biāo)志設(shè)置為true的方法;
所謂的中斷起便,只是將中斷標(biāo)識(shí)設(shè)置了下棚贾,并沒有真正的中斷線程的運(yùn)行,一般我們需要自己檢查線程的中斷狀態(tài)榆综,并設(shè)計(jì)如何中斷妙痹。
方法sleep()
、wait
以及join
會(huì)對(duì)中斷標(biāo)識(shí)有所處理鼻疮,當(dāng)線程中斷標(biāo)識(shí)為true時(shí)怯伊,會(huì)拋出異常。
關(guān)閉線程池的方法:
- shutdown方法:告訴線程池拒絕接受新的任務(wù)陋守,但是已經(jīng)開始執(zhí)行的以及進(jìn)入隊(duì)列中的任務(wù)將會(huì)完成執(zhí)行震贵;
- shutdownNow方法:也是告訴線程池拒絕新的任務(wù),但是它會(huì)試圖將已經(jīng)開始執(zhí)行的任務(wù)以及隊(duì)列中的任務(wù)取消水评。這種取消是通過中斷線程來實(shí)現(xiàn)的猩系。也就是說我們的任務(wù)中沒有針對(duì)線程中斷做處理的情況下,在實(shí)際的使用體驗(yàn)上中燥,shutdownNow與shutdown效果是相同的寇甸。
當(dāng)調(diào)用shutdownNow()方法時(shí),線程池會(huì)變?yōu)閟top狀態(tài)疗涉。
runStateAtLeast(ctl.get(), STOP)
為true(即線程池被shutdownNow)拿霉,那么如果線程沒有中斷,確保線程被中斷runStateAtLeast(ctl.get(), STOP)
為false(線程池沒有被shutdownNow)咱扣。在線程沒有被中斷的情況下绽淘,不去中斷線程。當(dāng)然這種情況需要重新檢查shutdownNow的風(fēng)險(xiǎn)闹伪,當(dāng)清理中斷線程時(shí)沪铭。
代碼分析:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中斷worker線程壮池。
wt.interrupt();
在阻塞隊(duì)列中獲取任務(wù)
在上面的java.util.concurrent.ThreadPoolExecutor#runWorker
方法中task的獲取是Worker中的firstTask屬性或者getTask()方法來完成獲取的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
//獲取到線程池的狀態(tài)
int rs = runStateOf(c);
//如果返回null杀怠,那么上面runWorker()方法會(huì)跳出while循環(huán)椰憋,然后執(zhí)行銷毀worker線程。
//SHUTDOWN:表示執(zhí)行了shutdown()方法赔退;
//STOP:表示執(zhí)行了shutdownNow()方法橙依;
//如果執(zhí)行了shutdown方法且workQueue為空,那么ctl線程數(shù)量-1硕旗;
//如果執(zhí)行了shutdownNow方法窗骑,那么ctl線程數(shù)量也-1;
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取當(dāng)前正在運(yùn)行中的worker數(shù)量
int wc = workerCountOf(c);
// 是否允許核心線程超時(shí)或者當(dāng)前運(yùn)行的線程數(shù)超過了核心線程數(shù)漆枚。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//timeOut默認(rèn)false慧域。若配置keepTimeOut并未獲取到任務(wù)時(shí),會(huì)置為true浪读。此時(shí),若存在工作線程辛藻,且隊(duì)列為null碘橘,那么就銷毀該線程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//通過CAS來設(shè)置workerCount吱肌,如果存在多個(gè)線程競(jìng)爭(zhēng)痘拆,只有一個(gè)可以設(shè)置成功。
//如果沒有設(shè)置后才能給氮墨,就進(jìn)入下一次循環(huán)纺蛆。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//在workQueue中取任務(wù),poll方法存在等待的超時(shí)時(shí)間keepAliveTime规揪,但是在規(guī)定時(shí)間內(nèi)沒有在阻塞隊(duì)列中獲取任務(wù)桥氏,那么timedOut會(huì)被置為true。
//take()方法會(huì)一直等待猛铅。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果r為null字支,就設(shè)置timedOut為true(注意,方法并未跳出奸忽,開始自旋)堕伪;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
推薦閱讀
歷史文章
JAVA并發(fā)(1)—java對(duì)象布局
JAVA并發(fā)(2)—PV機(jī)制與monitor(管程)機(jī)制
JAVA并發(fā)(3)—線程運(yùn)行時(shí)發(fā)生GC,會(huì)回收ThreadLocal弱引用的key嗎栗菜?
JAVA并發(fā)(4)— ThreadLocal源碼角度分析是否真正能造成內(nèi)存溢出欠雌!
JAVA并發(fā)(5)— 多線程順序的打印出A,B疙筹,C(線程間的協(xié)作)
JAVA并發(fā)(6)— AQS源碼解析(獨(dú)占鎖-加鎖過程)
JAVA并發(fā)(7)—AQS源碼解析(獨(dú)占鎖-解鎖過程)
JAVA并發(fā)(8)—AQS公平鎖為什么會(huì)比非公平鎖效率低(源碼分析)
JAVA并發(fā)(9)— 共享鎖的獲取與釋放
JAVA并發(fā)(10)—interrupt喚醒掛起線程
JAVA并發(fā)(11)—AQS源碼Condition阻塞和喚醒
JAVA并發(fā)(12)— Lock實(shí)現(xiàn)生產(chǎn)者消費(fèi)者