在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    「高并發(fā)」這樣理解線程池中Worker線程的執(zhí)行流程才正確

    Worker類分析

    Worker類從類的結(jié)構(gòu)上來看,繼承了AQS(AbstractQueuedSynchronizer類)并實現(xiàn)了Runnable接口。本質(zhì)上,Worker類既是一個同步組件,也是一個執(zhí)行任務(wù)的線程。接下來,我們看下Worker類的源碼,如下所示。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {private static final long serialVersionUID = 6138294804551838833L;//執(zhí)行任務(wù)的線程類final Thread thread;//初始化執(zhí)行的任務(wù),第一次執(zhí)行的任務(wù)Runnable firstTask;//完成任務(wù)的計數(shù)volatile long completedTasks;//Worker類的構(gòu)造方法,初始化任務(wù)并調(diào)用線程工廠創(chuàng)建執(zhí)行任務(wù)的線程Worker(Runnable firstTask) {setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//重寫Runnable接口的run()方法public void run() {//調(diào)用ThreadPoolExecutor類的runWorker(Worker)方法runWorker(this);}//檢測是否是否獲取到鎖//state=0表示未獲取到鎖//state=1表示已獲取到鎖protected boolean isHeldExclusively() {return getState() != 0;}//使用AQS設(shè)置線程狀態(tài)protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//嘗試釋放鎖protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}復(fù)制代碼

    在Worker類的構(gòu)造方法中,可以看出,首先將同步狀態(tài)state設(shè)置為-1,設(shè)置為-1是為了防止runWorker方法運行之前被中斷。這是因為如果其他線程調(diào)用線程池的shutdownNow()方法時,如果Worker類中的state狀態(tài)的值大于0,則會中斷線程,如果state狀態(tài)的值為-1,則不會中斷線程。

    Worker類實現(xiàn)了Runnable接口,需要重寫run方法,而Worker的run方法本質(zhì)上調(diào)用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先調(diào)用unlock方法,該方法會將state置為0,所以這個時候調(diào)用shutDownNow方法就會中斷當(dāng)前線程,而這個時候已經(jīng)進(jìn)入了runWork方法,就不會在還沒有執(zhí)行runWorker方法的時候就中斷線程。

    注意:大家需要重點理解Worker類的實現(xiàn)。

    Worker類中調(diào)用了ThreadPoolExecutor類的runWorker(Worker)方法。接下來,我們一起看下ThreadPoolExecutor類的runWorker(Worker)方法的實現(xiàn)。

    runWorker(Worker)方法

    首先,我們看下RunWorker(Worker)方法的源碼,如下所示。

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//釋放鎖,將state設(shè)置為0,允許中斷任務(wù)的執(zhí)行w.unlock();boolean completedAbruptly = true;try {//如果任務(wù)不為空,或者從任務(wù)隊列中獲取的任務(wù)不為空,則執(zhí)行while循環(huán)while (task != null || (task = getTask()) != null) {//如果任務(wù)不為空,則獲取Worker工作線程的獨占鎖w.lock();//如果線程已經(jīng)停止,或者中斷線程后線程終止并且沒有成功中斷線程//大家好好理解下這個邏輯if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中斷線程wt.interrupt();try {//執(zhí)行任務(wù)前執(zhí)行的邏輯beforeExecute(wt, task);Throwable thrown = null;try {//調(diào)用Runable接口的run方法執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//執(zhí)行任務(wù)后執(zhí)行的邏輯afterExecute(task, thrown);}} finally {//任務(wù)執(zhí)行完成后,將其設(shè)置為空task = null;//完成的任務(wù)數(shù)量加1w.completedTasks++;//釋放工作線程獲得的鎖w.unlock();}}completedAbruptly = false;} finally {//執(zhí)行退出Worker線程的邏輯processWorkerExit(w, completedAbruptly);}}復(fù)制代碼

    這里,我們拆解runWorker(Worker)方法。

    (1)獲取當(dāng)前線程的句柄和工作線程中的任務(wù),并將工作線程中的任務(wù)設(shè)置為空,執(zhí)行unlock方法釋放鎖,將state狀態(tài)設(shè)置為0,此時可以中斷工作線程,代碼如下所示。

    Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//釋放鎖,將state設(shè)置為0,允許中斷任務(wù)的執(zhí)行w.unlock();復(fù)制代碼

    (2)在while循環(huán)中進(jìn)行判斷,如果任務(wù)不為空,或者從任務(wù)隊列中獲取的任務(wù)不為空,則執(zhí)行while循環(huán),否則,調(diào)用processWorkerExit(Worker, boolean)方法退出Worker工作線程。

    while (task != null || (task = getTask()) != null)復(fù)制代碼

    (3)如果滿足while的循環(huán)條件,首先獲取工作線程內(nèi)部的獨占鎖,并執(zhí)行一系列的邏輯判斷來檢測是否需要中斷當(dāng)前線程的執(zhí)行,代碼如下所示。

    //如果任務(wù)不為空,則獲取Worker工作線程的獨占鎖w.lock();//如果線程已經(jīng)停止,或者中斷線程后線程終止并且沒有成功中斷線程//大家好好理解下這個邏輯if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())//中斷線程wt.interrupt();復(fù)制代碼

    (4)調(diào)用執(zhí)行任務(wù)前執(zhí)行的邏輯,如下所示

    //執(zhí)行任務(wù)前執(zhí)行的邏輯beforeExecute(wt, task);復(fù)制代碼

    (5)調(diào)用Runable接口的run方法執(zhí)行任務(wù)

    //調(diào)用Runable接口的run方法執(zhí)行任務(wù)task.run();復(fù)制代碼

    (6)調(diào)用執(zhí)行任務(wù)后執(zhí)行的邏輯

    //執(zhí)行任務(wù)后執(zhí)行的邏輯afterExecute(task, thrown);復(fù)制代碼

    (7)將完成的任務(wù)設(shè)置為空,完成的任務(wù)數(shù)量加1并釋放工作線程的鎖。

    //任務(wù)執(zhí)行完成后,將其設(shè)置為空task = null;//完成的任務(wù)數(shù)量加1w.completedTasks++;//釋放工作線程獲得的鎖w.unlock();復(fù)制代碼

    (8)退出Worker線程的執(zhí)行,如下所示

    //執(zhí)行退出Worker線程的邏輯processWorkerExit(w, completedAbruptly);復(fù)制代碼

    從代碼分析上可以看到,當(dāng)從Worker線程中獲取的任務(wù)為空時,會調(diào)用getTask()方法從任務(wù)隊列中獲取任務(wù),接下來,我們看下getTask()方法的實現(xiàn)。

    getTask()方法

    我們先來看下getTask()方法的源代碼,如下所示。

    private Runnable getTask() {//輪詢是否超時的標(biāo)識boolean timedOut = false;//自旋for循環(huán)for (;;) {//獲取ctlint c = ctl.get();//獲取線程池的狀態(tài)int rs = runStateOf(c);//檢測任務(wù)隊列是否在線程池停止或關(guān)閉的時候為空//也就是說任務(wù)隊列是否在線程池未正常運行時為空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//減少Worker線程的數(shù)量decrementWorkerCount();return null;}//獲取線程池中線程的數(shù)量int wc = workerCountOf(c);//檢測當(dāng)前線程池中的線程數(shù)量是否大于corePoolSize的值或者是否正在等待執(zhí)行任務(wù)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果線程池中的線程數(shù)量大于corePoolSize//獲取大于corePoolSize或者是否正在等待執(zhí)行任務(wù)并且輪詢超時//并且當(dāng)前線程池中的線程數(shù)量大于1或者任務(wù)隊列為空if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//成功減少線程池中的工作線程數(shù)量if (compareAndDecrementWorkerCount(c))return null;continue;}try {//從任務(wù)隊列中獲取任務(wù)Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//任務(wù)不為空直接返回任務(wù)if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}復(fù)制代碼

    getTask()方法的邏輯比較簡單,大家看源碼就可以了,我這里就不重復(fù)描述了。

    接下來,我們看下在正式調(diào)用Runnable的run()方法前后,執(zhí)行的beforeExecute方法和afterExecute方法。

    beforeExecute(Thread, Runnable)方法

    beforeExecute(Thread, Runnable)方法的源代碼如下所示。

    protected void beforeExecute(Thread t, Runnable r) { }復(fù)制代碼

    可以看到,beforeExecute(Thread, Runnable)方法的方法體為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫beforeExecute(Thread, Runnable)方法,使得線程池正式執(zhí)行任務(wù)之前,執(zhí)行我們自己定義的業(yè)務(wù)邏輯。

    afterExecute(Runnable, Throwable)方法

    afterExecute(Runnable, Throwable)方法的源代碼如下所示。

    protected void afterExecute(Runnable r, Throwable t) { }復(fù)制代碼

    可以看到,afterExecute(Runnable, Throwable)方法的方法體同樣為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫afterExecute(Runnable, Throwable)方法,使得線程池在執(zhí)行任務(wù)之后執(zhí)行我們自己定義的業(yè)務(wù)邏輯。

    接下來,就是退出工作線程的processWorkerExit(Worker, boolean)方法。

    processWorkerExit(Worker, boolean)方法

    processWorkerExit(Worker, boolean)方法的邏輯主要是執(zhí)行退出Worker線程,并且對一些資源進(jìn)行清理,源代碼如下所示。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {//執(zhí)行過程中出現(xiàn)了異常,突然中斷if (completedAbruptly)//將工作線程的數(shù)量減1decrementWorkerCount();//獲取全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//累加完成的任務(wù)數(shù)量completedTaskCount += w.completedTasks;//將完成的任務(wù)從workers集合中移除workers.remove(w);} finally {//釋放鎖mainLock.unlock();}//嘗試終止工作線程的執(zhí)行tryTerminate();//獲取ctlint c = ctl.get();//判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {//如果沒有突然中斷完成if (!completedAbruptly) {//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0并且工作隊列不為空if (min == 0 && ! workQueue.isEmpty())//min的值設(shè)置為1min = 1;//如果線程池中的線程數(shù)量大于min的值if (workerCountOf(c) >= min)//返回,不再執(zhí)行程序return; }//調(diào)用addWorker方法addWorker(null, false);}}復(fù)制代碼

    接下來,我們拆解processWorkerExit(Worker, boolean)方法。

    (1)執(zhí)行過程中出現(xiàn)了異常,突然中斷執(zhí)行,則將工作線程數(shù)量減1,如下所示。

    //執(zhí)行過程中出現(xiàn)了異常,突然中斷if (completedAbruptly)//將工作線程的數(shù)量減1decrementWorkerCount();復(fù)制代碼

    (2)獲取鎖累加完成的任務(wù)數(shù)量,并將完成的任務(wù)從workers集合中移除,并釋放,如下所示。

    //獲取全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//累加完成的任務(wù)數(shù)量completedTaskCount += w.completedTasks;//將完成的任務(wù)從workers集合中移除workers.remove(w);} finally {//釋放鎖mainLock.unlock();}復(fù)制代碼

    (3)嘗試終止工作線程的執(zhí)行

    //嘗試終止工作線程的執(zhí)行tryTerminate();復(fù)制代碼

    (4)處判斷當(dāng)前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是,需要新增一個線程保證有足夠的線程可以執(zhí)行任務(wù)隊列中的任務(wù)或者提交的任務(wù)。

    //獲取ctlint c = ctl.get();//判斷當(dāng)前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)if (runStateLessThan(c, STOP)) {//如果沒有突然中斷完成if (!completedAbruptly) {//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min為0并且工作隊列不為空if (min == 0 && ! workQueue.isEmpty())//min的值設(shè)置為1min = 1;//如果線程池中的線程數(shù)量大于min的值if (workerCountOf(c) >= min)//返回,不再執(zhí)行程序return; }//調(diào)用addWorker方法addWorker(null, false);}復(fù)制代碼

    接下來,我們看下tryTerminate()方法。

    tryTerminate()方法

    tryTerminate()方法的源代碼如下所示。

    final void tryTerminate() {//自旋for循環(huán)for (;;) {//獲取ctlint c = ctl.get();//如果線程池的狀態(tài)為RUNNING//或者狀態(tài)大于TIDYING//或者狀態(tài)為SHUTDOWN并且任務(wù)隊列為空//直接返回程序,不再執(zhí)行后續(xù)邏輯if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果當(dāng)前線程池中的線程數(shù)量不等于0if (workerCountOf(c) != 0) { //中斷線程的執(zhí)行interruptIdleWorkers(ONLY_ONE);return;}//獲取線程池的全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//通過CAS將線程池的狀態(tài)設(shè)置為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//調(diào)用terminated()方法terminated();} finally {//將線程池狀態(tài)設(shè)置為TERMINATEDctl.set(ctlOf(TERMINATED, 0));//喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程termination.signalAll();}return;}} finally {//釋放鎖mainLock.unlock();}}}復(fù)制代碼

    (1)獲取ctl,根據(jù)情況設(shè)置線程池狀態(tài)或者中斷線程的執(zhí)行,并返回。

    //獲取ctlint c = ctl.get();//如果線程池的狀態(tài)為RUNNING//或者狀態(tài)大于TIDYING//或者狀態(tài)為SHUTDOWN并且任務(wù)隊列為空//直接返回程序,不再執(zhí)行后續(xù)邏輯if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果當(dāng)前線程池中的線程數(shù)量不等于0if (workerCountOf(c) != 0) { //中斷線程的執(zhí)行interruptIdleWorkers(ONLY_ONE);return;}復(fù)制代碼

    (2)獲取全局鎖,通過CAS設(shè)置線程池的狀態(tài),調(diào)用terminated()方法執(zhí)行邏輯,最終將線程池的狀態(tài)設(shè)置為TERMINATED,喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程,最終釋放鎖,如下所示。

    //獲取線程池的全局final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//通過CAS將線程池的狀態(tài)設(shè)置為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//調(diào)用terminated()方法terminated();} finally {//將線程池狀態(tài)設(shè)置為TERMINATEDctl.set(ctlOf(TERMINATED, 0));//喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程termination.signalAll();}return;}} finally {//釋放鎖mainLock.unlock();}復(fù)制代碼

    接下來,看下terminated()方法。

    terminated()方法

    terminated()方法的源代碼如下所示。

    protected void terminated() { }復(fù)制代碼

    可以看到,terminated()方法的方法體為空,我們可以創(chuàng)建ThreadPoolExecutor的子類來重寫terminated()方法,值得Worker線程調(diào)用tryTerminate()方法時執(zhí)行我們自己定義的terminated()方法的業(yè)務(wù)邏輯。

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
    上一篇 2022年8月20日 09:08
    下一篇 2022年8月20日 09:08

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時間:周一至周五,10:30-18:30,節(jié)假日休息