Worker類分析
Worker類從類的結(jié)構(gòu)上來看,繼承了AQS(AbstractQueuedSynchronizer類)并實現(xiàn)了Runnable接口。本質(zhì)上,Worker類既是一個同步組件,也是一個執(zhí)行任務(wù)的線程。接下來,我們看下Worker類的源碼,如下所示。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {private static final long serialVersionUID = 6138294804551838833L;//執(zhí)行任務(wù)的線程類final Thread thread;//初始化執(zhí)行的任務(wù),第一次執(zhí)行的任務(wù)Runnable firstTask;//完成任務(wù)的計數(shù)volatile long completedTasks;//Worker類的構(gòu)造方法,初始化任務(wù)并調(diào)用線程工廠創(chuàng)建執(zhí)行任務(wù)的線程Worker(Runnable firstTask) {setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//重寫Runnable接口的run()方法public void run() {//調(diào)用ThreadPoolExecutor類的runWorker(Worker)方法runWorker(this);}//檢測是否是否獲取到鎖//state=0表示未獲取到鎖//state=1表示已獲取到鎖protected boolean isHeldExclusively() {return getState() != 0;}//使用AQS設(shè)置線程狀態(tài)protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//嘗試釋放鎖protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}復(fù)制代碼
在Worker類的構(gòu)造方法中,可以看出,首先將同步狀態(tài)state設(shè)置為-1,設(shè)置為-1是為了防止runWorker方法運行之前被中斷。這是因為如果其他線程調(diào)用線程池的shutdownNow()方法時,如果Worker類中的state狀態(tài)的值大于0,則會中斷線程,如果state狀態(tài)的值為-1,則不會中斷線程。
Worker類實現(xiàn)了Runnable接口,需要重寫run方法,而Worker的run方法本質(zhì)上調(diào)用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先調(diào)用unlock方法,該方法會將state置為0,所以這個時候調(diào)用shutDownNow方法就會中斷當(dāng)前線程,而這個時候已經(jīng)進(jìn)入了runWork方法,就不會在還沒有執(zhí)行runWorker方法的時候就中斷線程。
注意:大家需要重點理解Worker類的實現(xiàn)。
Worker類中調(diào)用了ThreadPoolExecutor類的runWorker(Worker)方法。接下來,我們一起看下ThreadPoolExecutor類的runWorker(Worker)方法的實現(xiàn)。
runWorker(Worker)方法
首先,我們看下RunWorker(Worker)方法的源碼,如下所示。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//釋放鎖,將state設(shè)置為0,允許中斷任務(wù)的執(zhí)行w.unlock();boolean completedAbruptly = true;try {//如果任務(wù)不為空,或者從任務(wù)隊列中獲取的任務(wù)不為空,則執(zhí)行while循環(huán)while (task != null || (task = getTask()) != null) {//如果任務(wù)不為空,則獲取Worker工作線程的獨占鎖w.lock();//如果線程已經(jīng)停止,或者中斷線程后線程終止并且沒有成功中斷線程//大家好好理解下這個邏輯if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中斷線程wt.interrupt();try {//執(zhí)行任務(wù)前執(zhí)行的邏輯beforeExecute(wt, task);Throwable thrown = null;try {//調(diào)用Runable接口的run方法執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//執(zhí)行任務(wù)后執(zhí)行的邏輯afterExecute(task, thrown);}} finally {//任務(wù)執(zhí)行完成后,將其設(shè)置為空task = null;//完成的任務(wù)數(shù)量加1w.completedTasks++;//釋放工作線程獲得的鎖w.unlock();}}completedAbruptly = false;} finally {//執(zhí)行退出Worker線程的邏輯processWorkerExit(w, completedAbruptly);}}復(fù)制代碼
這里,我們拆解runWorker(Worker)方法。
(1)獲取當(dāng)前線程的句柄和工作線程中的任務(wù),并將工作線程中的任務(wù)設(shè)置為空,執(zhí)行unlock方法釋放鎖,將state狀態(tài)設(shè)置為0,此時可以中斷工作線程,代碼如下所示。
Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//釋放鎖,將state設(shè)置為0,允許中斷任務(wù)的執(zhí)行w.unlock();復(fù)制代碼
(2)在while循環(huán)中進(jìn)行判斷,如果任務(wù)不為空,或者從任務(wù)隊列中獲取的任務(wù)不為空,則執(zhí)行while循環(huán),否則,調(diào)用processWorkerExit(Worker, boolean)方法退出Worker工作線程。
while (task != null || (task = getTask()) != null)復(fù)制代碼
(3)如果滿足while的循環(huán)條件,首先獲取工作線程內(nèi)部的獨占鎖,并執(zhí)行一系列的邏輯判斷來檢測是否需要中斷當(dāng)前線程的執(zhí)行,代碼如下所示。
//如果任務(wù)不為空,則獲取Worker工作線程的獨占鎖w.lock();//如果線程已經(jīng)停止,或者中斷線程后線程終止并且沒有成功中斷線程//大家好好理解下這個邏輯if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中斷線程wt.interrupt();復(fù)制代碼
(4)調(diào)用執(zhí)行任務(wù)前執(zhí)行的邏輯,如下所示
//執(zhí)行任務(wù)前執(zhí)行的邏輯beforeExecute(wt, task);復(fù)制代碼
(5)調(diào)用Runable接口的run方法執(zhí)行任務(wù)
//調(diào)用Runable接口的run方法執(zhí)行任務(wù)task.run();復(fù)制代碼
(6)調(diào)用執(zhí)行任務(wù)后執(zhí)行的邏輯
//執(zhí)行任務(wù)后執(zhí)行的邏輯afterExecute(task, thrown);復(fù)制代碼
(7)將完成的任務(wù)設(shè)置為空,完成的任務(wù)數(shù)量加1并釋放工作線程的鎖。
//任務(wù)執(zhí)行完成后,將其設(shè)置為空task = null;//完成的任務(wù)數(shù)量加1w.completedTasks++;//釋放工作線程獲得的鎖w.unlock();復(fù)制代碼
(8)退出Worker線程的執(zhí)行,如下所示
//執(zhí)行退出Worker線程的邏輯processWorkerExit(w, completedAbruptly);復(fù)制代碼
從代碼分析上可以看到,當(dāng)從Worker線程中獲取的任務(wù)為空時,會調(diào)用getTask()方法從任務(wù)隊列中獲取任務(wù),接下來,我們看下getTask()方法的實現(xiàn)。
getTask()方法
我們先來看下getTask()方法的源代碼,如下所示。
private Runnable getTask() {//輪詢是否超時的標(biāo)識boolean timedOut = false;//自旋for循環(huán)for (;;) {//獲取ctlint c = ctl.get();//獲取線程池的狀態(tài)int rs = runStateOf(c);//檢測任務(wù)隊列是否在線程池停止或關(guān)閉的時候為空//也就是說任務(wù)隊列是否在線程池未正常運行時為空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//減少Worker線程的數(shù)量decrementWorkerCount();return null;}//獲取線程池中線程的數(shù)量int wc = workerCountOf(c);//檢測當(dāng)前線程池中的線程數(shù)量是否大于corePoolSize的值或者是否正在等待執(zhí)行任務(wù)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果線程池中的線程數(shù)量大于corePoolSize//獲取大于corePoolSize或者是否正在等待執(zhí)行任務(wù)并且輪詢超時//并且當(dāng)前線程池中的線程數(shù)量大于1或者任務(wù)隊列為空if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//成功減少線程池中的工作線程數(shù)量if (compareAndDecrementWorkerCount(c))return null;continue;}try {//從任務(wù)隊列中獲取任務(wù)Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//任務(wù)不為空直接返回任務(wù)if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}復(fù)制代碼
getTask()方法的邏輯比較簡單,大家看源碼就可以了,我這里就不重復(fù)描述了。
接下來,我們看下在正式調(diào)用Runnable的run()方法前后,執(zhí)行的beforeExecute方法和afterExecute方法。
beforeExecute(Thread, Runnable)方法
beforeExecute(Thread, Runnable)方法的源代碼如下所示。
protected void beforeExecute(Thread t, Runnable r) { }復(fù)制代碼
可以看到,beforeExecute(Thread, Runnable)方法的方法體為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫beforeExecute(Thread, Runnable)方法,使得線程池正式執(zhí)行任務(wù)之前,執(zhí)行我們自己定義的業(yè)務(wù)邏輯。
afterExecute(Runnable, Throwable)方法
afterExecute(Runnable, Throwable)方法的源代碼如下所示。
protected void afterExecute(Runnable r, Throwable t) { }復(fù)制代碼
可以看到,afterExecute(Runnable, Throwable)方法的方法體同樣為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫afterExecute(Runnable, Throwable)方法,使得線程池在執(zhí)行任務(wù)之后執(zhí)行我們自己定義的業(yè)務(wù)邏輯。
接下來,就是退出工作線程的processWorkerExit(Worker, boolean)方法。
processWorkerExit(Worker, boolean)方法
processWorkerExit(Worker, boolean)方法的邏輯主要是執(zhí)行退出Worker線程,并且對一些資源進(jìn)行清理,源代碼如下所示。
private void processWorkerExit(Worker w, boolean completedAbruptly) {//執(zhí)行過程中出現(xiàn)了異常,突然中斷if (completedAbruptly)//將工作線程的數(shù)量減1decrementWorkerCount();//獲取全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//累加完成的任務(wù)數(shù)量completedTaskCount += w.completedTasks;//將完成的任務(wù)從workers集合中移除workers.remove(w);} finally {//釋放鎖mainLock.unlock();}//嘗試終止工作線程的執(zhí)行tryTerminate();//獲取ctlint c = ctl.get();//判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {//如果沒有突然中斷完成if (!completedAbruptly) {//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0并且工作隊列不為空if (min == 0 && ! workQueue.isEmpty())//min的值設(shè)置為1min = 1;//如果線程池中的線程數(shù)量大于min的值if (workerCountOf(c) >= min)//返回,不再執(zhí)行程序return; }//調(diào)用addWorker方法addWorker(null, false);}}復(fù)制代碼
接下來,我們拆解processWorkerExit(Worker, boolean)方法。
(1)執(zhí)行過程中出現(xiàn)了異常,突然中斷執(zhí)行,則將工作線程數(shù)量減1,如下所示。
//執(zhí)行過程中出現(xiàn)了異常,突然中斷if (completedAbruptly)//將工作線程的數(shù)量減1decrementWorkerCount();復(fù)制代碼
(2)獲取鎖累加完成的任務(wù)數(shù)量,并將完成的任務(wù)從workers集合中移除,并釋放,如下所示。
//獲取全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//累加完成的任務(wù)數(shù)量completedTaskCount += w.completedTasks;//將完成的任務(wù)從workers集合中移除workers.remove(w);} finally {//釋放鎖mainLock.unlock();}復(fù)制代碼
(3)嘗試終止工作線程的執(zhí)行
//嘗試終止工作線程的執(zhí)行tryTerminate();復(fù)制代碼
(4)處判斷當(dāng)前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是,需要新增一個線程保證有足夠的線程可以執(zhí)行任務(wù)隊列中的任務(wù)或者提交的任務(wù)。
//獲取ctlint c = ctl.get();//判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {//如果沒有突然中斷完成if (!completedAbruptly) {//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0并且工作隊列不為空if (min == 0 && ! workQueue.isEmpty())//min的值設(shè)置為1min = 1;//如果線程池中的線程數(shù)量大于min的值if (workerCountOf(c) >= min)//返回,不再執(zhí)行程序return; }//調(diào)用addWorker方法addWorker(null, false);}復(fù)制代碼
接下來,我們看下tryTerminate()方法。
tryTerminate()方法
tryTerminate()方法的源代碼如下所示。
final void tryTerminate() {//自旋for循環(huán)for (;;) {//獲取ctlint c = ctl.get();//如果線程池的狀態(tài)為RUNNING//或者狀態(tài)大于TIDYING//或者狀態(tài)為SHUTDOWN并且任務(wù)隊列為空//直接返回程序,不再執(zhí)行后續(xù)邏輯if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果當(dāng)前線程池中的線程數(shù)量不等于0if (workerCountOf(c) != 0) { //中斷線程的執(zhí)行interruptIdleWorkers(ONLY_ONE);return;}//獲取線程池的全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//通過CAS將線程池的狀態(tài)設(shè)置為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//調(diào)用terminated()方法terminated();} finally {//將線程池狀態(tài)設(shè)置為TERMINATEDctl.set(ctlOf(TERMINATED, 0));//喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程termination.signalAll();}return;}} finally {//釋放鎖mainLock.unlock();}}}復(fù)制代碼
(1)獲取ctl,根據(jù)情況設(shè)置線程池狀態(tài)或者中斷線程的執(zhí)行,并返回。
//獲取ctlint c = ctl.get();//如果線程池的狀態(tài)為RUNNING//或者狀態(tài)大于TIDYING//或者狀態(tài)為SHUTDOWN并且任務(wù)隊列為空//直接返回程序,不再執(zhí)行后續(xù)邏輯if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果當(dāng)前線程池中的線程數(shù)量不等于0if (workerCountOf(c) != 0) { //中斷線程的執(zhí)行interruptIdleWorkers(ONLY_ONE);return;}復(fù)制代碼
(2)獲取全局鎖,通過CAS設(shè)置線程池的狀態(tài),調(diào)用terminated()方法執(zhí)行邏輯,最終將線程池的狀態(tài)設(shè)置為TERMINATED,喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程,最終釋放鎖,如下所示。
//獲取線程池的全局final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//通過CAS將線程池的狀態(tài)設(shè)置為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//調(diào)用terminated()方法terminated();} finally {//將線程池狀態(tài)設(shè)置為TERMINATEDctl.set(ctlOf(TERMINATED, 0));//喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程termination.signalAll();}return;}} finally {//釋放鎖mainLock.unlock();}復(fù)制代碼
接下來,看下terminated()方法。
terminated()方法
terminated()方法的源代碼如下所示。
protected void terminated() { }復(fù)制代碼
可以看到,terminated()方法的方法體為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫terminated()方法,值得Worker線程調(diào)用tryTerminate()方法時執(zhí)行我們自己定義的terminated()方法的業(yè)務(wù)邏輯。