JDK數(shù)組阻塞隊列源碼深入剖析
阻塞隊列的功能
而在本篇文章所談到的阻塞隊列當中,是在并發(fā)的情況下使用的,上面所談到的是隊列是 并發(fā)不安全 的,但是阻塞隊列在并發(fā)下情況是安全的。阻塞隊列的主要的需求如下:
- 隊列基礎(chǔ)的功能需要有,往隊列當中放數(shù)據(jù),從隊列當中取數(shù)據(jù)。
- 所有的隊列操作都要是 并發(fā)安全 的。
- 當隊列滿了之后再往隊列當中放數(shù)據(jù)的時候,線程需要被掛起,當隊列當中的數(shù)據(jù)被取出,讓隊列當中有空間的時候線程需要被喚醒。
- 當隊列空了之后再往隊列當中取數(shù)據(jù)的時候,線程需要被掛起,當有線程往隊列當中加入數(shù)據(jù)的時候被掛起的線程需要被喚醒。
- 在我們實現(xiàn)的隊列當中我們使用數(shù)組去存儲數(shù)據(jù),因此在構(gòu)造函數(shù)當中需要提供數(shù)組的初始大小,設置用多大的數(shù)組。
上面就是數(shù)組阻塞隊列給我們提供的最核心的功能,其中將線程掛起和喚醒就是阻塞隊列的核心,掛起和喚醒體現(xiàn)了“阻塞”這一核心思想。
數(shù)組阻塞隊列設計
閱讀這部分內(nèi)容你需要熟悉可重入鎖 ReentrantLock 和條件變量 Condition 的使用。
數(shù)組的循環(huán)使用
因為我們是使用數(shù)組存儲隊列當中的數(shù)據(jù),從下表為0的位置開始,當我們往隊列當中加入一些數(shù)據(jù)之后,隊列的情況可能如下,其中head表示隊頭,tail表示隊尾。
在上圖的基礎(chǔ)之上我們在進行四次出隊操作,結(jié)果如下:
在上面的狀態(tài)下,我們繼續(xù)加入8個數(shù)據(jù),那么布局情況如下:
我們知道上圖在加入數(shù)據(jù)的時候不僅將數(shù)組后半部分的空間使用完了,而且可以繼續(xù)使用前半部分沒有使用過的空間,也就是說在隊列內(nèi)部實現(xiàn)了一個循環(huán)使用的過程。
字段設計
在JDK當中數(shù)組阻塞隊列的實現(xiàn)是 ArrayBlockingQueue 類,在他的內(nèi)部是使用數(shù)組實現(xiàn)的,我們現(xiàn)在來看一下它的主要的字段,為了方便閱讀將所有的解釋說明都寫在的注釋當中:
/** The queued items */ final Object[] items; // 這個就是具體存儲數(shù)據(jù)的數(shù)組 /** items index for next take, poll, peek or remove */ int takeIndex; // 因為是隊列 因此我們需要知道下一個出隊的數(shù)據(jù)的下標 這個就是表示下一個將要出隊的數(shù)據(jù)的下標 /** items index for next put, offer, or add */ int putIndex; // 我們同時也需要下一個入隊的數(shù)據(jù)的下標 /** Number of elements in the queue */ int count; // 統(tǒng)計隊列當中一共有多少個數(shù)據(jù) /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; // 因為阻塞隊列是一種可以并發(fā)使用的數(shù)據(jù)結(jié)構(gòu) /** Condition for waiting takes */ private final Condition notEmpty; // 這個條件變量主要用于喚醒被 take 函數(shù)阻塞的線程 也就是從隊列當中取數(shù)據(jù)的線程 /** Condition for waiting puts */ private final Condition notFull; // 這個條件變量主要用于喚醒被 put 函數(shù)阻塞的線程 也就是從隊列當中放數(shù)據(jù)的線程
構(gòu)造函數(shù)
構(gòu)造函數(shù)的主要功能是申請指定大小的內(nèi)存空間,并且對類的成員變量進行賦值操作。
public ArrayBlockingQueue(int capacity) { // capacity 表示用與存儲數(shù)據(jù)的數(shù)組的長度 this(capacity, false);}// fair 這個參數(shù)主要是用于說明 是否使用公平鎖// 如果為 true 表示使用公平鎖 執(zhí)行效率低 但是各個線程進入臨界區(qū)的順序是先來后到的順序 更加公平// 如果為 false 表示使用非公平鎖 執(zhí)行效率更高public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 對變量進行賦值操作 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}
put函數(shù)
這個函數(shù)是阻塞隊列對核心的函數(shù)之一了,首先我們需要了解的是,如果一個線程調(diào)用了這個函數(shù)往隊列當中加入數(shù)據(jù),如果此時隊列已經(jīng)滿了則線程需要被掛起,如果沒有滿則需要將數(shù)據(jù)加入到隊列當中,也就是將數(shù)據(jù)存儲到數(shù)組當中。注意還有一個很重要的一點是,當我們往隊列當中加入一個數(shù)據(jù)之后需要發(fā)一個信號給其他被 take 函數(shù)阻塞的線程,因為這些線程在取數(shù)據(jù)的時候可能隊列當中已經(jīng)空了,因此需要將這些線程喚醒。
public void put(E e) throws InterruptedException { checkNotNull(e); // 保證輸入的數(shù)據(jù)不為 null 代碼在下方 final ReentrantLock lock = this.lock; // 進行加鎖操作,因為下面是臨界區(qū) lock.lockInterruptibly(); try { while (count == items.length) // 如果隊列已經(jīng)滿了 也就是隊列當中數(shù)據(jù)的個數(shù) count == 數(shù)組的長度的話 就需要將線程掛起 notFull.await(); // 當隊列當中有空間的之后將數(shù)據(jù)加入到隊列當中 這個函數(shù)在下面仔細分析 代碼在下方 enqueue(e); } finally { lock.unlock(); }}private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException();}private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // 進入這個函數(shù)的線程已經(jīng)在 put 函數(shù)當中加上鎖了 因此這里不需要加鎖 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 因為這個數(shù)據(jù)是循環(huán)使用的 因此可以回到下標為0的位置 // 因為隊列當中的數(shù)據(jù)可以出隊 因此下標為 0 的位置不存在數(shù)據(jù)可以使用 putIndex = 0; count++; // 在這里需要將一個被 take 函數(shù)阻塞的線程喚醒 如果調(diào)用這個方法的時候沒有線程阻塞 // 那么調(diào)用這個方法相當于沒有調(diào)用 如果有線程阻塞那么將會喚醒一個線程 notEmpty.signal();}
注意:這里有一個地方非常容易被忽略,那就是在將線程掛起的時候使用的是 while 循環(huán)而不是 if 條件語句,代碼:
final ReentrantLock lock = this.lock;lock.lockInterruptibly();try { while (count == items.length) notFull.await(); enqueue(e);} finally { lock.unlock();}
這是因為,線程被喚醒之后并不會立馬執(zhí)行,因為線程在調(diào)用 await 方法的之后會釋放鎖:lock:,他想再次執(zhí)行還需要再次獲得鎖,然后就在他獲取鎖之前的這段時間里面,可能其他的線程也會從數(shù)組當中放數(shù)據(jù),因此這個線程執(zhí)行的時候隊列可能還是滿的,因此需要再次判斷,否則就會覆蓋數(shù)據(jù),像這種喚醒之后并沒有滿足線程執(zhí)行條件的現(xiàn)象叫做 虛假喚醒 ,因此大家在寫程序的時候要格外注意,當需要將線程掛起或者喚醒的之后,最好考慮清楚,如果不確定可以使用 while 替代 if ,這樣的話更加保險。
take函數(shù)
這個函數(shù)主要是從隊列當中取數(shù)據(jù),但是當隊列為空的時候需要將調(diào)用這個方法的線程阻塞。當隊列當中有數(shù)據(jù)的時候,就可以從隊列當中取出數(shù)據(jù),但是有一點很重要的就是當從隊列當中取出數(shù)據(jù)之后,需要調(diào)用 signal 方法,用于喚醒被 put 函數(shù)阻塞的線程,因為從隊列當中取出數(shù)據(jù)了,隊列肯定已經(jīng)不滿了,因此可以喚醒被 put 函數(shù)阻塞的線程了。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 因為取數(shù)據(jù)的代碼涉及到數(shù)據(jù)競爭 也就是說多個線程同時競爭 數(shù)組數(shù)據(jù)items 因此需要用鎖保護起來 lock.lockInterruptibly(); try { // 當 count == 0 說明隊列當中沒有數(shù)據(jù) while (count == 0) notEmpty.await(); // 當隊列當中還有數(shù)據(jù)的時候可以將數(shù)據(jù)出隊 return dequeue(); } finally { lock.unlock(); }}private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings(“unchecked”) // 取出數(shù)據(jù) E x = (E) items[takeIndex]; items[takeIndex] = null; // 將對應的位置設置為 null 數(shù)據(jù)就可以被垃圾收集器回收了 if (++takeIndex == items.length) takeIndex = 0; count–; // 迭代器也需要出隊 如果不了 if (itrs != null) itrs.elementDequeued(); // 調(diào)用 signal 函數(shù) 將被 put 函數(shù)阻塞的線程喚醒 如果調(diào)用這個方法的時候沒有線程阻塞 // 那么調(diào)用這個方法相當于沒有調(diào)用 如果有線程阻塞那么將會喚醒一個線程 notFull.signal(); return x;}
同樣的道理這里也需要使用 while 循環(huán)去進行阻塞,否則可能存在 虛假喚醒 ,可能隊列當中沒有數(shù)據(jù)返回的數(shù)據(jù)為 null,而且會破壞隊列的結(jié)構(gòu)因為會涉及隊列的兩個端點的值的改變,也就是 takeIndex 和 putIndex 的改變。
offer函數(shù)
這個函數(shù)的作用和put函數(shù)一樣,只不過當隊列滿了的時候,這個函數(shù)返回false,加入數(shù)據(jù)成功之后這個函數(shù)返回true,下面的代碼就比較簡單了。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { // 如果隊列當中的數(shù)據(jù)個數(shù)和數(shù)組的長度相等 說明隊列滿了 直接返回 false 即可 if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); }}
add函數(shù)
這個函數(shù)和上面兩個函數(shù)的意義也是一樣的,只不過當隊列滿了之后這個函數(shù)會拋出異常。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException(“Queue full”);}
poll函數(shù)
這個函數(shù)和take函數(shù)的作用差不多,但是這個函數(shù)不會阻塞,當隊列當中沒有數(shù)據(jù)的時候直接返回null,有數(shù)據(jù)的話返回數(shù)據(jù)。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); }}
總結(jié)
在本篇文章當中主要介紹了JDK內(nèi)部是如何實現(xiàn) ArrayBlockingQueue 的,如果你對鎖和隊列的使用有一定的了解本篇文章應該還是比較容易理解的。在實現(xiàn) ArrayBlockingQueue 當中有以下需要注意的點:
- put 函數(shù),如果在往隊列當中加入數(shù)據(jù)的時候隊列滿了,則需要將線程掛起。在隊列當中有空間之后,線程被喚醒繼續(xù)執(zhí)行,在往隊列當中加入了數(shù)據(jù)之后,需要調(diào)用 signal 方法,喚醒被 take 函數(shù)阻塞的線程。
- take 函數(shù),如果在往隊列當中取出數(shù)據(jù)的時候隊列空了,則需要將線程掛起。在隊列當中有數(shù)據(jù)之后,線程被喚醒繼續(xù)執(zhí)行,在從隊列當中取出數(shù)據(jù)之后,需要調(diào)用 signal 方法,喚醒被 put 函數(shù)阻塞的線程。
- 在調(diào)用 await 函數(shù)的時候,需要小心 虛假喚醒 現(xiàn)象