一、QuartzSchedulerThread 的 run方法大致闡述
先說一下run方法的執(zhí)行時機:
當(dāng) Quartzscheduler執(zhí)行start方法 時,方法體中有一句
schedThread.togglePause(false); ,接著就會調(diào)用 QuartzSchedulerThread 下的 togglePause 方法,將 paused 置為 false ,在此之后, QuartzSchedulerThread 下的 run 方法開始真正運行
/**通知主處理循環(huán)在下一個可能的點暫停 */ void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); } else { sigLock.notifyAll(); } } }復(fù)制代碼 public void run() { int acquiresFailed = 0; // 這里就是判斷調(diào)度器是否該停止,如果沒有收到信號的話,這個調(diào)度器是一直處于循環(huán)之中的 while (!halted.get()) { try { // 這里是檢查我們是否應(yīng)該暫停 synchronized (sigLock) { // 我們在初始化的時候,paused 是置為 true的, // 因此在上文中,我們才說 // 當(dāng) Quartzscheduler 執(zhí)行 start方法時調(diào)用togglePause, // 將 paused 置為false,run 方法才開始運行 // 也是因為此處的判斷 while (paused && !halted.get()) { try { sigLock.wait(1000L); }catch (InterruptedException ignore) {} acquiresFailed = 0; } if (halted.get()) { break;} } // 如果從作業(yè)存儲中讀取一直失敗(例如數(shù)據(jù)庫關(guān)閉或重新啟動) // 就會等待一段時間~ if (acquiresFailed > 1) { try { //這里就是計算延遲時間 long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) {} } // 從線程池拿出空閑可利用的線程數(shù)量 // 這里多談一嘴 blockForAvailableThreads()方法 // 它是一個阻塞式方法,直到至少有一個可用線程。 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { List triggers; long now = System.currentTimeMillis(); // 清除信號調(diào)度變更 clearSignaledSchedulingChange(); try { //如果可用線程數(shù)量足夠那么就是30后再次掃描, //acquireNextTriggers方法的三個參數(shù)的意思分別是: //idleWaitTime :為如果沒有的再次掃描的時間,默認(rèn)是 // private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L; 30秒 //Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()) :這里的意思就是一次最多能取幾個出來 //batchTimeWindow :默認(rèn)是0,同樣是一個時間范圍, //如果有兩個任務(wù)只差一兩秒,而執(zhí)行線程數(shù)量滿足及batchTimeWindow時間也滿足的情況下就會兩個都取出來 // 具體的方法的執(zhí)行,后文再看~ triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()){ //… } //在獲取到 triggers 觸發(fā)器不為空后, //trigger列表是以下次執(zhí)行時間排序查出來的 if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); //取出集合中最早執(zhí)行的觸發(fā)器 //獲取它的下一個觸發(fā)時間 long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime – now; // 判斷距離執(zhí)行時間是否大于2 毫秒 while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } //判斷是不是距離觸發(fā)事件最近的, if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // 沒有的話,就進(jìn)行阻塞,稍后進(jìn)行執(zhí)行 now = System.currentTimeMillis(); timeUntilTrigger = triggerTime – now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) {} } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime – now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to ‘executing’ List bndles = new ArrayList(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { //開始根據(jù)需要執(zhí)行的trigger從數(shù)據(jù)庫中獲取相應(yīng)的JobDetail 同時這一步也更新了 triggers 的狀態(tài),稍后會講到~ List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( “An error occurred while firing triggers ‘” + triggers + “‘”, se); for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } //將查詢到的結(jié)果封裝成為 TriggerFiredResult for (int i = 0; i 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime – now; synchronized(sigLock) { // …. } } // …. } // while (!halted) // …. }復(fù)制代碼
二、一些細(xì)節(jié)
2.1、先獲取線程池中的可用線程數(shù)量
(若沒有可用的會阻塞,直到有可用的);
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); 復(fù)制代碼
2.2、獲取 30m 內(nèi)要執(zhí)行的 trigger
(即 acquireNextTriggers )
我們來看一看 acquireNextTriggers 方法
首先說 acquireNextTriggers 具體實現(xiàn)是在 JobStoreSupport 中,同時 quartz 與數(shù)據(jù)庫關(guān)聯(lián)的實現(xiàn)大都在 JobStoreSupport 中,當(dāng)然更具體的SQL執(zhí)行還是在 DriverDelegate 接口下的。
acquireNextTriggers 做了哪些事情呢?
我們看看這兩個方法:
首先看第一個 acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
主要就是獲取下一個 30m內(nèi)可執(zhí)行的triggers的觸發(fā)器,在里面 JobStoreSupport 從數(shù)據(jù)庫取出 triggers 時是按照 nextFireTime 排序的
更具體的就需要大家點進(jìn)方法去看啦~另外里面還包含triggers狀態(tài)的變更,屬于是更加細(xì)節(jié)化的東西。
第二個就是獲取到觸發(fā)的觸發(fā)記錄~
然后在執(zhí)行 executeInNonManagedTXLock 時,是需要先獲得鎖,之后再在提交時釋放鎖的。
待直到獲取的trigger中最先執(zhí)行的trigger在2ms內(nèi);
if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime – now; while(timeUntilTrigger > 2) { //… } }復(fù)制代碼
2.3、triggersFired(triggers)
List res = qsRsrcs.getJobStore().triggersFired(triggers);
這一步看著只是獲取了 List 對象,實際上在 triggersFired(triggers) 方法中隱藏了很多東西~
首先查詢,確保觸發(fā)器沒有被刪除、暫?;蛲瓿?#8230;,就更新 firedTrigger 的 status=STATE_EXECUTING ;代碼的注釋上還說,如果沒有這些就會將狀態(tài)該為 deleted
另外就是更新觸發(fā)觸發(fā)器:
如果下一次的執(zhí)行時間為空,狀態(tài)則改為 STATE_COMPLETE
在執(zhí)行 executeInNonManagedTXLock 方法時,提交前先獲得鎖, transOwner = getLockHandler().obtainLock(conn, lockName);
最后是釋放鎖: commitConnection(conn);
2.4、創(chuàng)建JobRunShell,放進(jìn)線程池執(zhí)行
針對每個要執(zhí)行的trigger,創(chuàng)建JobRunShell,并放入線程池執(zhí)行:
然后由execute:執(zhí)行job
更詳細(xì)的看不下去啦~
來源:https://juejin.cn/post/7131671438901116935