JDK數(shù)組阻塞隊(duì)列源碼深入剖析
阻塞隊(duì)列的功能
而在本篇文章所談到的阻塞隊(duì)列當(dāng)中,是在并發(fā)的情況下使用的,上面所談到的是隊(duì)列是 并發(fā)不安全 的,但是阻塞隊(duì)列在并發(fā)下情況是安全的。阻塞隊(duì)列的主要的需求如下:
- 隊(duì)列基礎(chǔ)的功能需要有,往隊(duì)列當(dāng)中放數(shù)據(jù),從隊(duì)列當(dāng)中取數(shù)據(jù)。
- 所有的隊(duì)列操作都要是 并發(fā)安全 的。
- 當(dāng)隊(duì)列滿了之后再往隊(duì)列當(dāng)中放數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)隊(duì)列當(dāng)中的數(shù)據(jù)被取出,讓隊(duì)列當(dāng)中有空間的時(shí)候線程需要被喚醒。
- 當(dāng)隊(duì)列空了之后再往隊(duì)列當(dāng)中取數(shù)據(jù)的時(shí)候,線程需要被掛起,當(dāng)有線程往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候被掛起的線程需要被喚醒。
- 在我們實(shí)現(xiàn)的隊(duì)列當(dāng)中我們使用數(shù)組去存儲(chǔ)數(shù)據(jù),因此在構(gòu)造函數(shù)當(dāng)中需要提供數(shù)組的初始大小,設(shè)置用多大的數(shù)組。
上面就是數(shù)組阻塞隊(duì)列給我們提供的最核心的功能,其中將線程掛起和喚醒就是阻塞隊(duì)列的核心,掛起和喚醒體現(xiàn)了“阻塞”這一核心思想。
數(shù)組阻塞隊(duì)列設(shè)計(jì)
閱讀這部分內(nèi)容你需要熟悉可重入鎖 ReentrantLock 和條件變量 Condition 的使用。
數(shù)組的循環(huán)使用
因?yàn)槲覀兪鞘褂脭?shù)組存儲(chǔ)隊(duì)列當(dāng)中的數(shù)據(jù),從下表為0的位置開始,當(dāng)我們往隊(duì)列當(dāng)中加入一些數(shù)據(jù)之后,隊(duì)列的情況可能如下,其中head表示隊(duì)頭,tail表示隊(duì)尾。
在上圖的基礎(chǔ)之上我們?cè)谶M(jìn)行四次出隊(duì)操作,結(jié)果如下:
在上面的狀態(tài)下,我們繼續(xù)加入8個(gè)數(shù)據(jù),那么布局情況如下:
我們知道上圖在加入數(shù)據(jù)的時(shí)候不僅將數(shù)組后半部分的空間使用完了,而且可以繼續(xù)使用前半部分沒有使用過的空間,也就是說在隊(duì)列內(nèi)部實(shí)現(xiàn)了一個(gè)循環(huán)使用的過程。
字段設(shè)計(jì)
在JDK當(dāng)中數(shù)組阻塞隊(duì)列的實(shí)現(xiàn)是 ArrayBlockingQueue 類,在他的內(nèi)部是使用數(shù)組實(shí)現(xiàn)的,我們現(xiàn)在來看一下它的主要的字段,為了方便閱讀將所有的解釋說明都寫在的注釋當(dāng)中:
/** The queued items */ final Object[] items; // 這個(gè)就是具體存儲(chǔ)數(shù)據(jù)的數(shù)組 /** items index for next take, poll, peek or remove */ int takeIndex; // 因?yàn)槭顷?duì)列 因此我們需要知道下一個(gè)出隊(duì)的數(shù)據(jù)的下標(biāo) 這個(gè)就是表示下一個(gè)將要出隊(duì)的數(shù)據(jù)的下標(biāo) /** items index for next put, offer, or add */ int putIndex; // 我們同時(shí)也需要下一個(gè)入隊(duì)的數(shù)據(jù)的下標(biāo) /** Number of elements in the queue */ int count; // 統(tǒng)計(jì)隊(duì)列當(dāng)中一共有多少個(gè)數(shù)據(jù) /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; // 因?yàn)樽枞?duì)列是一種可以并發(fā)使用的數(shù)據(jù)結(jié)構(gòu) /** Condition for waiting takes */ private final Condition notEmpty; // 這個(gè)條件變量主要用于喚醒被 take 函數(shù)阻塞的線程 也就是從隊(duì)列當(dāng)中取數(shù)據(jù)的線程 /** Condition for waiting puts */ private final Condition notFull; // 這個(gè)條件變量主要用于喚醒被 put 函數(shù)阻塞的線程 也就是從隊(duì)列當(dāng)中放數(shù)據(jù)的線程
構(gòu)造函數(shù)
構(gòu)造函數(shù)的主要功能是申請(qǐng)指定大小的內(nèi)存空間,并且對(duì)類的成員變量進(jìn)行賦值操作。
public ArrayBlockingQueue(int capacity) { // capacity 表示用與存儲(chǔ)數(shù)據(jù)的數(shù)組的長(zhǎng)度 this(capacity, false);}// fair 這個(gè)參數(shù)主要是用于說明 是否使用公平鎖// 如果為 true 表示使用公平鎖 執(zhí)行效率低 但是各個(gè)線程進(jìn)入臨界區(qū)的順序是先來后到的順序 更加公平// 如果為 false 表示使用非公平鎖 執(zhí)行效率更高public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 對(duì)變量進(jìn)行賦值操作 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}
put函數(shù)
這個(gè)函數(shù)是阻塞隊(duì)列對(duì)核心的函數(shù)之一了,首先我們需要了解的是,如果一個(gè)線程調(diào)用了這個(gè)函數(shù)往隊(duì)列當(dāng)中加入數(shù)據(jù),如果此時(shí)隊(duì)列已經(jīng)滿了則線程需要被掛起,如果沒有滿則需要將數(shù)據(jù)加入到隊(duì)列當(dāng)中,也就是將數(shù)據(jù)存儲(chǔ)到數(shù)組當(dāng)中。注意還有一個(gè)很重要的一點(diǎn)是,當(dāng)我們往隊(duì)列當(dāng)中加入一個(gè)數(shù)據(jù)之后需要發(fā)一個(gè)信號(hào)給其他被 take 函數(shù)阻塞的線程,因?yàn)檫@些線程在取數(shù)據(jù)的時(shí)候可能隊(duì)列當(dāng)中已經(jīng)空了,因此需要將這些線程喚醒。
public void put(E e) throws InterruptedException { checkNotNull(e); // 保證輸入的數(shù)據(jù)不為 null 代碼在下方 final ReentrantLock lock = this.lock; // 進(jìn)行加鎖操作,因?yàn)橄旅媸桥R界區(qū) lock.lockInterruptibly(); try { while (count == items.length) // 如果隊(duì)列已經(jīng)滿了 也就是隊(duì)列當(dāng)中數(shù)據(jù)的個(gè)數(shù) count == 數(shù)組的長(zhǎng)度的話 就需要將線程掛起 notFull.await(); // 當(dāng)隊(duì)列當(dāng)中有空間的之后將數(shù)據(jù)加入到隊(duì)列當(dāng)中 這個(gè)函數(shù)在下面仔細(xì)分析 代碼在下方 enqueue(e); } finally { lock.unlock(); }}private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException();}private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // 進(jìn)入這個(gè)函數(shù)的線程已經(jīng)在 put 函數(shù)當(dāng)中加上鎖了 因此這里不需要加鎖 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 因?yàn)檫@個(gè)數(shù)據(jù)是循環(huán)使用的 因此可以回到下標(biāo)為0的位置 // 因?yàn)殛?duì)列當(dāng)中的數(shù)據(jù)可以出隊(duì) 因此下標(biāo)為 0 的位置不存在數(shù)據(jù)可以使用 putIndex = 0; count++; // 在這里需要將一個(gè)被 take 函數(shù)阻塞的線程喚醒 如果調(diào)用這個(gè)方法的時(shí)候沒有線程阻塞 // 那么調(diào)用這個(gè)方法相當(dāng)于沒有調(diào)用 如果有線程阻塞那么將會(huì)喚醒一個(gè)線程 notEmpty.signal();}
注意:這里有一個(gè)地方非常容易被忽略,那就是在將線程掛起的時(shí)候使用的是 while 循環(huán)而不是 if 條件語句,代碼:
final ReentrantLock lock = this.lock;lock.lockInterruptibly();try { while (count == items.length) notFull.await(); enqueue(e);} finally { lock.unlock();}
這是因?yàn)?,線程被喚醒之后并不會(huì)立馬執(zhí)行,因?yàn)榫€程在調(diào)用 await 方法的之后會(huì)釋放鎖:lock:,他想再次執(zhí)行還需要再次獲得鎖,然后就在他獲取鎖之前的這段時(shí)間里面,可能其他的線程也會(huì)從數(shù)組當(dāng)中放數(shù)據(jù),因此這個(gè)線程執(zhí)行的時(shí)候隊(duì)列可能還是滿的,因此需要再次判斷,否則就會(huì)覆蓋數(shù)據(jù),像這種喚醒之后并沒有滿足線程執(zhí)行條件的現(xiàn)象叫做 虛假喚醒 ,因此大家在寫程序的時(shí)候要格外注意,當(dāng)需要將線程掛起或者喚醒的之后,最好考慮清楚,如果不確定可以使用 while 替代 if ,這樣的話更加保險(xiǎn)。
take函數(shù)
這個(gè)函數(shù)主要是從隊(duì)列當(dāng)中取數(shù)據(jù),但是當(dāng)隊(duì)列為空的時(shí)候需要將調(diào)用這個(gè)方法的線程阻塞。當(dāng)隊(duì)列當(dāng)中有數(shù)據(jù)的時(shí)候,就可以從隊(duì)列當(dāng)中取出數(shù)據(jù),但是有一點(diǎn)很重要的就是當(dāng)從隊(duì)列當(dāng)中取出數(shù)據(jù)之后,需要調(diào)用 signal 方法,用于喚醒被 put 函數(shù)阻塞的線程,因?yàn)閺年?duì)列當(dāng)中取出數(shù)據(jù)了,隊(duì)列肯定已經(jīng)不滿了,因此可以喚醒被 put 函數(shù)阻塞的線程了。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 因?yàn)槿?shù)據(jù)的代碼涉及到數(shù)據(jù)競(jìng)爭(zhēng) 也就是說多個(gè)線程同時(shí)競(jìng)爭(zhēng) 數(shù)組數(shù)據(jù)items 因此需要用鎖保護(hù)起來 lock.lockInterruptibly(); try { // 當(dāng) count == 0 說明隊(duì)列當(dāng)中沒有數(shù)據(jù) while (count == 0) notEmpty.await(); // 當(dāng)隊(duì)列當(dāng)中還有數(shù)據(jù)的時(shí)候可以將數(shù)據(jù)出隊(duì) return dequeue(); } finally { lock.unlock(); }}private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings(“unchecked”) // 取出數(shù)據(jù) E x = (E) items[takeIndex]; items[takeIndex] = null; // 將對(duì)應(yīng)的位置設(shè)置為 null 數(shù)據(jù)就可以被垃圾收集器回收了 if (++takeIndex == items.length) takeIndex = 0; count–; // 迭代器也需要出隊(duì) 如果不了 if (itrs != null) itrs.elementDequeued(); // 調(diào)用 signal 函數(shù) 將被 put 函數(shù)阻塞的線程喚醒 如果調(diào)用這個(gè)方法的時(shí)候沒有線程阻塞 // 那么調(diào)用這個(gè)方法相當(dāng)于沒有調(diào)用 如果有線程阻塞那么將會(huì)喚醒一個(gè)線程 notFull.signal(); return x;}
同樣的道理這里也需要使用 while 循環(huán)去進(jìn)行阻塞,否則可能存在 虛假喚醒 ,可能隊(duì)列當(dāng)中沒有數(shù)據(jù)返回的數(shù)據(jù)為 null,而且會(huì)破壞隊(duì)列的結(jié)構(gòu)因?yàn)闀?huì)涉及隊(duì)列的兩個(gè)端點(diǎn)的值的改變,也就是 takeIndex 和 putIndex 的改變。
offer函數(shù)
這個(gè)函數(shù)的作用和put函數(shù)一樣,只不過當(dāng)隊(duì)列滿了的時(shí)候,這個(gè)函數(shù)返回false,加入數(shù)據(jù)成功之后這個(gè)函數(shù)返回true,下面的代碼就比較簡(jiǎn)單了。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { // 如果隊(duì)列當(dāng)中的數(shù)據(jù)個(gè)數(shù)和數(shù)組的長(zhǎng)度相等 說明隊(duì)列滿了 直接返回 false 即可 if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); }}
add函數(shù)
這個(gè)函數(shù)和上面兩個(gè)函數(shù)的意義也是一樣的,只不過當(dāng)隊(duì)列滿了之后這個(gè)函數(shù)會(huì)拋出異常。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException(“Queue full”);}
poll函數(shù)
這個(gè)函數(shù)和take函數(shù)的作用差不多,但是這個(gè)函數(shù)不會(huì)阻塞,當(dāng)隊(duì)列當(dāng)中沒有數(shù)據(jù)的時(shí)候直接返回null,有數(shù)據(jù)的話返回?cái)?shù)據(jù)。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); }}
總結(jié)
在本篇文章當(dāng)中主要介紹了JDK內(nèi)部是如何實(shí)現(xiàn) ArrayBlockingQueue 的,如果你對(duì)鎖和隊(duì)列的使用有一定的了解本篇文章應(yīng)該還是比較容易理解的。在實(shí)現(xiàn) ArrayBlockingQueue 當(dāng)中有以下需要注意的點(diǎn):
- put 函數(shù),如果在往隊(duì)列當(dāng)中加入數(shù)據(jù)的時(shí)候隊(duì)列滿了,則需要將線程掛起。在隊(duì)列當(dāng)中有空間之后,線程被喚醒繼續(xù)執(zhí)行,在往隊(duì)列當(dāng)中加入了數(shù)據(jù)之后,需要調(diào)用 signal 方法,喚醒被 take 函數(shù)阻塞的線程。
- take 函數(shù),如果在往隊(duì)列當(dāng)中取出數(shù)據(jù)的時(shí)候隊(duì)列空了,則需要將線程掛起。在隊(duì)列當(dāng)中有數(shù)據(jù)之后,線程被喚醒繼續(xù)執(zhí)行,在從隊(duì)列當(dāng)中取出數(shù)據(jù)之后,需要調(diào)用 signal 方法,喚醒被 put 函數(shù)阻塞的線程。
- 在調(diào)用 await 函數(shù)的時(shí)候,需要小心 虛假喚醒 現(xiàn)象