一、AQS 簡介
AQS,就是 AbstractQueuedSynchronizer,在同步組件的實現(xiàn)中,AQS是核心部分,同步組件的實現(xiàn)者通過使用AQS提供的模板方法實現(xiàn)同步組件語義,AQS則實現(xiàn)了對同步狀態(tài)的管理,以及對阻塞線程進行排隊,等待通知等等一些底層的實現(xiàn)處理。AQS的核心也包括了這些方面:同步隊列,獨占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實現(xiàn),而這些實際上則是AQS提供出來的模板方法,歸納整理如下:
AbstractQueuedSynchronizer 的獨占式鎖方法如下:
// 獨占式獲取同步狀態(tài),如果獲取失敗則插入同步隊列進行等待;public final void acquire(int arg) // 與acquire方法相同,但在同步隊列中進行等待的時候可以檢測中斷;public final void acquireInterruptibly(int arg)// 在acquireInterruptibly基礎(chǔ)上增加了超時等待功能,在超時時間內(nèi)沒有獲得同步狀態(tài)返回false;public final boolean tryAcquireNanos(int arg, long nanosTimeout)// 釋放同步狀態(tài),該方法會喚醒在同步隊列中的下一個節(jié)點public final boolean release(int arg)
AbstractQueuedSynchronizer 的共享式鎖方法如下:
// 共享式獲取同步狀態(tài),與獨占式的區(qū)別在于同一時刻有多個線程獲取同步狀態(tài);public final void acquireShared(int arg)// 在acquireShared方法基礎(chǔ)上增加了能響應中斷的功能;public final void acquireSharedInterruptibly(int arg)// 在acquireSharedInterruptibly基礎(chǔ)上增加了超時等待的功能;public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)// 共享式釋放同步狀態(tài)public final boolean releaseShared(int arg)
二、同步隊列
先要了解一下AQS中的同步隊列。
當共享資源被某個線程占有,其他請求該資源的線程就會被阻塞,從而進入同步隊列。
就數(shù)據(jù)結(jié)構(gòu)而言,隊列的實現(xiàn)就是數(shù)組或鏈表的形式。AQS 中的同步隊列是通過鏈表實現(xiàn)的。
在AQS 中有一個靜態(tài)內(nèi)部類:
static final class Node {//指示節(jié)點正在共享模式下等待的標記 static final Node SHARED = new Node();//指示節(jié)點正在獨占模式下等待的標記 static final Node EXCLUSIVE = null;//節(jié)點從同步隊列中取消 static final int CANCELLED = 1;//后繼節(jié)點的線程處于等待狀態(tài),如果當前節(jié)點釋放同步狀態(tài)會通知后繼節(jié)點,使得后繼節(jié)點的線程能夠運行; static final int SIGNAL = -1;//當前節(jié)點進入等待隊列中 static final int CONDITION = -2;//表示下一次共享式同步狀態(tài)獲取將會無條件傳播下去 static final int PROPAGATE = -3; //節(jié)點狀態(tài) volatile int waitStatus;//當前節(jié)點(線程)的前驅(qū)節(jié)點 volatile Node prev;//當前節(jié)點(線程)的后繼節(jié)點 volatile Node next;//加入同步隊列的線程引用 volatile Thread thread;//等待隊列中的下一個節(jié)點 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}
從上面可以看出來,AQS 的同步隊列有前驅(qū)結(jié)點和后續(xù)節(jié)點,它是一個雙向鏈表。下面看一個Demo
public class Test { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); MyThread myThread = new MyThread(lock); for (int i = 0; i < 5; i++) { new Thread(myThread).start(); } }} class MyThread implements Runnable { private final ReentrantLock lock; static int count; MyThread(ReentrantLock lock) { this.lock = lock; } @Override public void run() { lock.lock(); System.out.println("count == " + count++); lock.unlock(); }}
實例代碼中開啟了5個線程,先獲取鎖之后再睡眠10S中,實際上這里讓線程睡眠是想模擬出當線程無法獲取鎖時進入同步隊列的情況。通過debug,當Thread-4(在本例中最后一個線程)獲取鎖失敗后進入同步時,AQS時現(xiàn)在的同步隊列如圖所示:
Thread-0先獲得鎖后進行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進入同步隊列,同時也可以很清楚的看出來每個節(jié)點有兩個域:prev(前驅(qū))和next(后繼),并且每個節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS中有兩個重要的成員變量:
private transient volatile Node head;private transient volatile Node tail;
也就是說AQS實際上通過頭尾指針來管理同步隊列,同時實現(xiàn)包括獲取鎖失敗的線程進行入隊,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖如下:
通過對源碼的理解以及做實驗的方式,現(xiàn)在我們可以清楚的知道這樣幾點:
- 節(jié)點的數(shù)據(jù)結(jié)構(gòu),即AQS的靜態(tài)內(nèi)部類Node,節(jié)點的等待狀態(tài)等信息;
- 同步隊列是一個雙向隊列,AQS通過持有頭尾指針管理同步隊列;
那么,節(jié)點如何進行入隊和出隊是怎樣做的了?實際上這對應著鎖的獲取和釋放兩個操作:獲取鎖失敗進行入隊操作,獲取鎖成功進行出隊操作。
三、獨占鎖
1. 獨占鎖的獲取
獨占鎖的獲?。╝cquire方法)
繼續(xù)通過看源碼和debug的方式來看,還是以上面的demo為例,調(diào)用lock()方法是獲取獨占式鎖,獲取失敗就將當前線程加入同步隊列,成功則線程執(zhí)行。而lock()方法實際上會調(diào)用AQS的acquire()方法,源碼如下:
public final void acquire(int arg) { //先看同步狀態(tài)是否獲取成功,如果成功則方法結(jié)束返回 //若失敗則先調(diào)用addWaiter()方法,再調(diào)用acquireQueued()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
acquire根據(jù)當前獲得同步狀態(tài)成功與否做了兩件事情:
1.1 獲取同步狀態(tài)失敗,入隊操作
當線程獲取獨占式鎖失敗后就會將當前線程加入同步隊列,那么加入隊列的方式是怎樣的了?我們接下來就應該去研究一下addWaiter()和acquireQueued()。addWaiter() 源碼如下:
private Node addWaiter(Node mode) { // 1. 將當前線程構(gòu)建成Node類型 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 2. 當前尾節(jié)點是否為null Node pred = tail; if (pred != null) { // 2.2 將當前節(jié)點尾插入的方式插入同步隊列中 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 2.1. 當前同步隊列尾節(jié)點為null,說明當前線程是第一個加入同步隊列進行等待的線程 enq(node); return node;}
分析可以看上面的注釋。程序的邏輯主要分為兩個部分:
另外還會有另外一個問題:如果 if (compareAndSetTail(pred, node))為false怎么辦?會繼續(xù)執(zhí)行到enq()方法,同時很明顯compareAndSetTail是一個CAS操作,通常來說如果CAS操作失敗會繼續(xù)自旋(死循環(huán))進行重試。
因此,經(jīng)過我們這樣的分析,enq()方法可能承擔兩個任務:
那么是不是真的就像我們分析的一樣了?只有源碼會告訴我們答案,enq()源碼如下:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //1. 構(gòu)造頭結(jié)點 if (compareAndSetHead(new Node())) tail = head; } else { // 2. 尾插入,CAS操作失敗自旋嘗試 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
在上面的分析中我們可以看出在第1步中會先創(chuàng)建頭結(jié)點,說明同步隊列是帶頭結(jié)點的鏈式存儲結(jié)構(gòu)。帶頭結(jié)點與不帶頭結(jié)點相比,會在入隊和出隊的操作中獲得更大的便捷性,因此同步隊列選擇了帶頭結(jié)點的鏈式存儲結(jié)構(gòu)。那么帶頭節(jié)點的隊列初始化時機是什么?自然而然是在tail為null時,即當前線程是第一次插入同步隊列。compareAndSetTail(t, node)方法會利用CAS操作設(shè)置尾節(jié)點,如果CAS操作失敗會在for (;;)for死循環(huán)中不斷嘗試,直至成功return返回為止。因此,對enq()方法可以做這樣的總結(jié):
現(xiàn)在我們已經(jīng)很清楚獲取獨占式鎖失敗的線程包裝成Node然后插入同步隊列的過程了。那么緊接著會有下一個問題:在同步隊列中的節(jié)點(線程)會做什么事情來保證自己能夠有機會獲得獨占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個方法的作用就是排隊獲取鎖的過程,源碼如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 1. 獲得當前節(jié)點的先驅(qū)節(jié)點 final Node p = node.predecessor(); // 2. 當前節(jié)點能否獲取獨占式鎖 // 2.1 如果當前節(jié)點的先驅(qū)節(jié)點是頭結(jié)點并且成功獲取同步狀態(tài),即可以獲得獨占式鎖 if (p == head && tryAcquire(arg)) { //隊列頭指針用指向當前節(jié)點 setHead(node); //釋放前驅(qū)節(jié)點 p.next = null; // help GC failed = false; return interrupted; } // 2.2 獲取鎖失敗,線程進入等待狀態(tài)等待獲取獨占式鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
程序邏輯通過注釋已經(jīng)標出,整體來看這又是一個自旋的過程(for循環(huán)),代碼首先獲取當前節(jié)點的先驅(qū)節(jié)點,如果先驅(qū)節(jié)點是頭結(jié)點的并且成功獲得同步狀態(tài)的時候(if (p == head && tryAcquire(arg))),當前節(jié)點所指向的線程能夠獲取鎖。反之,獲取鎖失敗進入等待狀態(tài)。整體示意圖為下圖:
1.2 獲取鎖成功,出隊操作
獲取鎖的節(jié)點出隊的邏輯是:
//隊列頭結(jié)點引用指向當前節(jié)點setHead(node);//釋放前驅(qū)節(jié)點p.next = null; // help GCfailed = false;return interrupted;
setHead() 方法為:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null;}
將當前節(jié)點通過setHead()方法設(shè)置為隊列的頭結(jié)點,然后將之前的頭結(jié)點的next域設(shè)置為null并且pre域也為null,即與隊列斷開,無任何引用方便GC時能夠?qū)?nèi)存進行回收。示意圖如下:
那么當獲取鎖失敗的時候會調(diào)用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們做了什么事情。shouldParkAfterFailedAcquire()方法源碼為:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don’t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS將節(jié)點狀態(tài)由INITIAL設(shè)置成SIGNAL,表示當前線程阻塞。當compareAndSetWaitStatus設(shè)置失敗則說明shouldParkAfterFailedAcquire方法返回false,然后會在acquireQueued()方法中for (;;)死循環(huán)中會繼續(xù)重試,直至compareAndSetWaitStatus設(shè)置節(jié)點狀態(tài)位為SIGNAL時shouldParkAfterFailedAcquire返回true時才會執(zhí)行方法parkAndCheckInterrupt()方法,該方法的源碼為:
private final boolean parkAndCheckInterrupt() { //使得該線程阻塞LockSupport.park(this); return Thread.interrupted();}
該方法的關(guān)鍵是會調(diào)用LookSupport.park()方法(關(guān)于LookSupport會在以后的文章進行討論),該方法是用來阻塞當前線程的。因此到這里就應該清楚了,acquireQueued()在自旋過程中主要完成了兩件事情:
經(jīng)過上面的分析,獨占式鎖的獲取過程也就是acquire()方法的執(zhí)行流程如下圖所示:
2. 獨占鎖的釋放
獨占鎖的釋放(release()方法)
ReentrantLock 的 unlock() 方法,調(diào)用的是 AQS 的 release() 方法。下面看一下
獨占鎖的釋放就相對來說比較容易理解了,先來看下源碼:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
這段代碼邏輯就比較容易理解了,如果同步狀態(tài)釋放成功(tryRelease返回true)則會執(zhí)行if塊中的代碼,當head指向的頭結(jié)點不為null,并且該節(jié)點的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()方法。unparkSuccessor方法源碼:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null)//后繼節(jié)點不為null時喚醒該線程 LockSupport.unpark(s.thread);}
源碼的關(guān)鍵信息請看注釋,首先獲取頭節(jié)點的后繼節(jié)點,當后繼節(jié)點不為空的時候會調(diào)用LookSupport.unpark()方法,該方法會喚醒該節(jié)點的后繼節(jié)點所包裝的線程。因此,每一次鎖釋放后就會喚醒隊列中該節(jié)點的后繼節(jié)點所引用的線程,從而進一步可以佐證獲得鎖的過程是一個FIFO(先進先出)的過程。
到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學習源碼的方式非常深刻的學習到了獨占式鎖的獲取和釋放的過程以及同步隊列??梢宰鲆幌驴偨Y(jié):
總體來說:在獲取同步狀態(tài)時,AQS維護一個同步隊列,獲取同步狀態(tài)失敗的線程會加入到隊列中進行自旋;移除隊列(或停止自旋)的條件是前驅(qū)節(jié)點是頭結(jié)點并且成功獲得了同步狀態(tài)。在釋放同步狀態(tài)時,同步器會調(diào)用unparkSuccessor() 方法喚醒后繼節(jié)點。
3. 可中斷式獲取鎖
可中斷式獲取鎖(acquireInterruptibly方法)
我們知道lock相較于synchronized有一些更方便的特性,比如能響應中斷以及超時等待等特性,現(xiàn)在我們依舊采用通過學習源碼的方式來看看能夠響應中斷是怎么實現(xiàn)的??身憫袛嗍芥i可調(diào)用方法lock.lockInterruptibly();而該方法其底層會調(diào)用AQS的acquireInterruptibly方法,源碼為:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) //線程獲取鎖失敗 doAcquireInterruptibly(arg);}
在獲取同步狀態(tài)失敗后就會調(diào)用doAcquireInterruptibly方法:
private void doAcquireInterruptibly(int arg) throws InterruptedException {//將節(jié)點插入到同步隊列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //獲取鎖出隊if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//線程中斷拋異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
關(guān)鍵信息請看注釋,現(xiàn)在看這段代碼就很輕松了吧,與acquire方法邏輯幾乎一致,唯一的區(qū)別是當parkAndCheckInterrupt返回true時,即線程阻塞時該線程被中斷,代碼拋出被中斷異常。
4. 超時等待式獲取鎖
超時等待式獲取鎖(tryAcquireNanos()方法)
通過調(diào)用lock.tryLock(timeout,TimeUnit)方式達到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:
我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學習底層具體是怎么實現(xiàn)的,該方法會調(diào)用AQS的方法tryAcquireNanos(),源碼為:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) ||//實現(xiàn)超時等待的效果 doAcquireNanos(arg, nanosTimeout);}
很顯然這段源碼最終是靠doAcquireNanos方法實現(xiàn)超時等待的效果,該方法源碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false;//1. 根據(jù)超時時間和當前時間計算出截止時間 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor();//2. 當前線程獲得鎖出隊列 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; }// 3.1 重新計算超時時間 nanosTimeout = deadline – System.nanoTime(); // 3.2 已經(jīng)超時返回falseif (nanosTimeout spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 3.4 線程被中斷拋出被中斷異常if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
程序邏輯如圖所示:
程序邏輯同獨占鎖和響應中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對超時時間的處理上,在第1步會先計算出按照現(xiàn)在時間和超時時間計算出理論上的截止時間,比如當前時間是08:10,超時時間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout計算出剛好達到超時時間時的系統(tǒng)時間就是08:10+10min = 08:20。然后根據(jù)deadline – System.nanoTime()就可以判斷是否已經(jīng)超時了,比如,當前系統(tǒng)時間是08:20很明顯已經(jīng)超過了理論上的系統(tǒng)時間08:20,deadline – System.nanoTime()計算出來就是一個負數(shù),自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos使得當前線程阻塞,同時在3.4步增加了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。
5. 公平鎖和非公平鎖
還是看上面同步隊列中的例子,以 ReentrantLock 的 lock() 方法為例。
public void lock() { sync.lock();}
可以看一下這里的 sync 是有兩個實現(xiàn)的:公平和非公平
5.1 公平鎖
進入 AQS 的 acquire() 方法,這個方法進去之后會調(diào)用 AQS 實現(xiàn)類的 tryAcquire() 方法。tryAcquire() 方法分為公平鎖和非公平鎖的實現(xiàn):
下面看一下公平鎖的實現(xiàn)(也就是 FairSync 的實現(xiàn)):
static final class FairSync extends Sync { //… protected final boolean tryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取當前線程的狀態(tài),大于0則是獲取到鎖的(1代表獲取一次鎖,2代表重入了一次),等于0就是沒獲取到鎖 int c = getState(); // 如果當前狀態(tài)沒獲取鎖 if (c == 0) { // 這里先判斷當前隊列是否為空,如果為空,當前隊列就可以直接獲取鎖 // 如果當前隊列不為空,則后面直接返回false,讓線程走上面分析的獨占鎖的獲取流程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果獲取到鎖的就是當前的線程,則直接重入一次,就是 state+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}
這里主要是 !hasQueuedPredecessors() 判斷了當前隊列是否為空,為空就加入隊列尾部等待,所以稱之為公平鎖。
5.2 非公平鎖
可以看到非公平鎖的實現(xiàn),上來就直接嘗試獲取鎖(就是compareAndSetState(0, 1)),如果獲取不到,就調(diào)用 AQS 的 acquire()方法,這里進入 AQS 的 acquire() 方法之后,再看非公平鎖的tryAcquire(arg) 實現(xiàn)
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }} abstract static class Sync extends AbstractQueuedSynchronizer { // … final boolean nonfairTryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取當前線程的狀態(tài),大于0則是獲取到鎖的(1代表獲取一次鎖,2代表重入了一次),等于0就是沒獲取到鎖 int c = getState(); // 如果當前狀態(tài)沒獲取鎖 if (c == 0) { // 這里就不會像公平鎖一樣去判斷隊列是否為空了,直接回嘗試搶占鎖。 // 如果搶到了就獲取鎖,沒搶到才會加入隊列尾部進行排隊 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果獲取到鎖的就是當前的線程,則直接重入一次,就是 state+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
四、共享鎖
共享鎖就需要看一下 ReentrantReadWriteLock 類了,這個是一個讀寫鎖,它的讀鎖就是共享鎖,寫鎖就是互斥鎖:讀讀共享、讀寫互斥,寫寫互斥。
ReentrantReadWriteLock 的 API 介紹
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();// 獲取讀鎖ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();readLock.lock();readLock.unlock();// 獲取寫鎖ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();writeLock.lock();writeLock.unlock();
1. 共享鎖的獲取
共享鎖的獲?。╝cquireShared()方法)
可以看看上面對于讀寫鎖的 API 介紹,看看 readLock.lock(); 和 writeLock.lock(); 的源碼可以發(fā)現(xiàn):
- readLock.lock(); 調(diào)用的是 AQS 的 acquireShared() 方法,也就是獲取共享鎖
- writeLock.lock(); 調(diào)用的是 AQS 的 acquire() 方法,也就是互斥鎖
下面看一下共享鎖是怎么獲取的,直接上 acquireShared() 方法的源碼:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}
這段源碼的邏輯很容易理解,在該方法中會首先調(diào)用tryAcquireShared方法,tryAcquireShared返回值是一個int類型,當返回值為大于等于0的時候方法結(jié)束說明獲得成功獲取鎖,否則,表明獲取同步狀態(tài)失敗即所引用的線程獲取鎖失敗,會執(zhí)行doAcquireShared方法,該方法的源碼為:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 當該節(jié)點的前驅(qū)節(jié)點是頭結(jié)點且成功獲取同步狀態(tài) setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
現(xiàn)在來看這段代碼會不會很容易了?邏輯幾乎和獨占式鎖的獲取一模一樣,這里的自旋過程中能夠退出的條件是當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且tryAcquireShared(arg)返回值大于等于0即能成功獲得同步狀態(tài)。
這里以公平鎖為例,看一下 tryAcquireShared(arg) 的源碼:
static final class FairSync extends Sync {// … protected int tryAcquireShared(int acquires) { for (;;) { // 隊列中有等待線程則不獲取鎖,返回 -1 if (hasQueuedPredecessors()) return -1; // 獲取狀態(tài) int available = getState(); int remaining = available – acquires; // 嘗試設(shè)置狀態(tài),但是這里獲取鎖的時候并不會給 state 加一,因為是共享鎖 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }}
這里和獨占鎖有一個很重要的區(qū)別,就是在獨占鎖中是調(diào)用了 AQS 的 acquireQueued 方法去排隊等待獲取鎖,而共享鎖只是將當前線程加入了隊列中,然后調(diào)用 tryAcquireShared(arg) 嘗試獲取鎖,并且共享鎖并不會給 state 加 1。
而互斥鎖,就是這里的寫鎖會給 state 加 1 ,代表當前是有互斥鎖的。
總結(jié)
- 互斥鎖會給 AQS 的 state 從 0 改為 1(代表當前線程獲取到鎖),如果重入鎖,state 會累加 1 ,釋放鎖 state 就會累減 1,減到 0 后就完全釋放鎖了
- 共享鎖并不會給 AQS 的 state 加一或減一,并且共享鎖加入隊列后并不會一直自旋等待獲取鎖,而是直接看 state 是否為 0,為 0 代表沒有寫鎖了,不為 0 就代表在它前面還有寫鎖。讀寫互斥
2. 共享鎖的釋放
共享鎖的釋放(releaseShared()方法)
共享鎖的釋放在AQS中會調(diào)用方法 releaseShared:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
當成功釋放同步狀態(tài)之后即tryReleaseShared會繼續(xù)執(zhí)行doReleaseShared方法:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }}
這段方法跟獨占式鎖釋放過程有點不同,在共享式鎖的釋放過程中,對于能夠支持多個線程同時訪問的并發(fā)組件,必須保證多個線程能夠安全的釋放同步狀態(tài),這里采用的CAS保證,當CAS操作失敗continue,在下一次循環(huán)中進行重試。
3. 可中斷和超時等待
可中斷(acquireSharedInterruptibly()方法),超時等待(tryAcquireSharedNanos()方法)
關(guān)于可中斷鎖以及超時等待的特性其實現(xiàn)和獨占式鎖可中斷獲取鎖以及超時等待的實現(xiàn)幾乎一致,具體的就不再說了,如果理解了上面的內(nèi)容對這部分的理解也是水到渠成的。
五、鎖
鎖升級和鎖降級不是一回事,鎖升級是synchronized關(guān)鍵字在jdk1.6之后做的優(yōu)化,鎖降級是為了保證數(shù)據(jù)的可見性在添加了寫鎖后再添加一道讀鎖
1. 鎖升級
鎖升級的順序為:無鎖 -> 偏向鎖 -> 輕量級鎖 -> 重量級鎖,且鎖升級的順序是不可逆的。
因為操作喚醒阻塞的線程需從用戶態(tài)切換到內(nèi)存態(tài),開銷大,所以才優(yōu)先使用輕量級鎖進行循環(huán)判斷是否可獲取監(jiān)視類或?qū)ο蟆?/p>
線程第一次獲取鎖獲時鎖的狀態(tài)為偏向鎖,如果下次還是這個線程獲取鎖,則鎖的狀態(tài)不變,否則會升級為CAS輕量級鎖;如果還有線程競爭獲取鎖,如果線程獲取到了輕量級鎖沒啥事了,如果沒獲取到會自旋,自旋期間獲取到了鎖沒啥事,超過了10次還沒獲取到鎖,鎖就升級為重量級的鎖,此時如果其他線程沒獲取到重量級鎖,就會被阻塞等待喚起,此時效率就低了。
2. 鎖降級
鎖降級:寫鎖降級成為讀鎖。持有寫鎖的同時,獲取到讀鎖,然后釋放寫鎖。避免讀到被其他線程修改的數(shù)據(jù)。
oracle官網(wǎng)鎖降級示例:
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); // 緩存無效 if (!cacheValid) { // Must release read lock before acquiring write lock // 釋放讀鎖 rwl.readLock().unlock(); // 嘗試獲取寫鎖 rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. // 再次判斷獲取是否無效 if (!cacheValid) { // 獲取數(shù)據(jù) data = … cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock // 鎖降級 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } // 經(jīng)過很長的時間做一些處理、而且不想我剛剛自己更新的數(shù)據(jù)被別人改了 try { use(data); } finally { rwl.readLock().unlock(); } } }
六、Synchronized 和 Lock 區(qū)別
百度抄一把!??!
存在層面:
synchronized: 是Java 中的一個關(guān)鍵字,存在于 JVM 層面
Lock: 是 Java 中的一個接口
鎖的釋放條件:
synchronized:1、以獲取鎖的線程執(zhí)行完同步代碼,釋放鎖 2、線程執(zhí)行發(fā)生異常,jvm會讓線程釋放鎖
Lock:在finally中必須釋放鎖,不然容易造成線程死鎖
鎖的獲取:
synchronized: 在發(fā)生異常時候會自動釋放占有的鎖,因此不會出現(xiàn)死鎖
Lock: 發(fā)生異常時候,不會主動釋放占有的鎖,必須手動unlock來釋放鎖,可能引起死鎖的發(fā)生
鎖的狀態(tài):
synchronized:無法判斷
Lock:可以判斷
鎖的類型:
synchronized: 可重入 不可中斷 非公平
Lock:可重入 可判斷 可公平(兩者皆可)
性能:
synchronized: 少量同步
Lock:大量同步
- Lock 可以提高多個線程進行讀操作的效率。(可以通過 readwritelock 實現(xiàn)讀寫分離)
- 在資源競爭不是很激烈的情況下,Synchronized 的性能要優(yōu)于 ReetrantLock,但是在資源競爭很激烈的情況下,Synchronized 的性能會下降幾十倍,但是 ReetrantLock 的性能能維持常態(tài);
- ReentrantLock 提供了多樣化的同步,比如有時間限制的同步,可以被 Interrupt 的同步(synchronized 的同步是不能 Interrupt 的)等。在資源競爭不激烈的情形下,性能稍微比synchronized 差點點。但是當同步非常激烈的時候,synchronized 的性能一下子能下降好幾十倍。而ReentrantLock 確還能維持常態(tài)。
調(diào)度:
synchronized:使用 Object 對象本身的 wait 、notify、notifyAll調(diào)度機制
Lock:可以使用 Condition 進行線程之間的調(diào)度
用法:
synchronized:在需要同步的對象中加入此控制,synchronized可以加在方法上,也可以加在特定代碼塊中,括號中表示需要鎖的對象。
Lock:一般使用ReentrantLock類做為鎖。在加鎖和解鎖處需要通過lock()和unlock()顯示指出。所以一般會在finally塊中寫unlock()以防死鎖。
底層實現(xiàn):
synchronized:底層使用指令碼方式來控制鎖的,映射成字節(jié)碼指令就是增加來兩個指令:monitorenter和monitorexit。當線程執(zhí)行遇到 monitorenter 指令時會嘗試獲取內(nèi)置鎖,如果獲取鎖則鎖計數(shù)器+1,如果沒有獲取鎖則阻塞;當遇到 monitorexit 指令時鎖計數(shù)器-1,如果計數(shù)器為0則釋放鎖。
Lock:底層是CAS樂觀鎖,依賴AbstractQueuedSynchronizer類,把所有的請求線程構(gòu)成一個CLH隊列。而對該隊列的操作均通過Lock-Free(CAS)操作。