queue 模塊即隊列,特別適合處理信息在多個線程間安全交換的多線程程序中。下面我們對 queue 模塊進行一個詳細的使用介紹。
queue 模塊定義的類和異常
queue 模塊定義了以下四種不同類型的隊列,它們之間的區(qū)別在于數(shù)據(jù)入隊列之后出隊列的順序不同。
queue.Queue(maxsize=0)
先進先出(First In First Out: FIFO)隊列,最早進入隊列的數(shù)據(jù)擁有出隊列的優(yōu)先權(quán),就像看電影入場時排隊一樣,排在隊伍前頭的優(yōu)先進入電影院。
入?yún)?maxsize 是一個整數(shù),用于設(shè)置隊列的最大長度。一旦隊列達到上限,插入數(shù)據(jù)將會被阻塞,直到有數(shù)據(jù)出隊列之后才可以繼續(xù)插入。如果 maxsize 設(shè)置為小于或等于零,則隊列的長度沒有限制。
示例如下:
import queueq = queue.Queue() # 創(chuàng)建 Queue 隊列for i in range(3): q.put(i) # 在隊列中依次插入0、1、2元素for i in range(3): print(q.get()) # 依次從隊列中取出插入的元素,數(shù)據(jù)元素輸出順序為0、1、2
queue.LifoQueue(maxsize=0)
后進先出(Last In First Out: LIFO)隊列,最后進入隊列的數(shù)據(jù)擁有出隊列的優(yōu)先權(quán),就像棧一樣。
入?yún)?maxsize 與先進先出隊列的定義一樣。
示例如下:
import queueq = queue.LifoQueue() # 創(chuàng)建 LifoQueue 隊列for i in range(3): q.put(i) # 在隊列中依次插入0、1、2元素for i in range(3): print(q.get()) # 依次從隊列中取出插入的元素,數(shù)據(jù)元素輸出順序為2、1、0
PriorityQueue(maxsize=0)
優(yōu)先級隊列,比較隊列中每個數(shù)據(jù)的大小,值最小的數(shù)據(jù)擁有出隊列的優(yōu)先權(quán)。數(shù)據(jù)一般以元組的形式插入,典型形式為(priority_number, data)。如果隊列中的數(shù)據(jù)沒有可比性,那么數(shù)據(jù)將被包裝在一個類中,忽略數(shù)據(jù)值,僅僅比較優(yōu)先級數(shù)字。
入?yún)?maxsize 與先進先出隊列的定義一樣。
示例如下:
import queueq = queue.PriorityQueue() # 創(chuàng)建 PriorityQueue 隊列data1 = (1, ‘python’)data2 = (2, ‘-‘)data3 = (3, ‘100’)style = (data2, data3, data1)for i in style: q.put(i) # 在隊列中依次插入元素 data2、data3、data1for i in range(3): print(q.get()) # 依次從隊列中取出插入的元素,數(shù)據(jù)元素輸出順序為 data1、data2、data3
queue.SimpleQueue
先進先出類型的簡單隊列,沒有大小限制。由于它是簡單隊列,相比于 Queue 隊列會缺少一些高級功能,下面第2-3小節(jié)將會介紹。
示例如下:
import queueq = queue.SimpleQueue() # 創(chuàng)建 SimpleQueue 隊列for i in range(3): q.put(i) # 在隊列中依次插入0、1、2元素for i in range(3): print(q.get()) # 依次從隊列中取出插入的元素,數(shù)據(jù)元素輸出順序為0、1、2
queue.Empty 異常
當(dāng)隊列中沒有數(shù)據(jù)元素時,取出隊列中的數(shù)據(jù)會引發(fā) queue.Empty 異常,主要是不正當(dāng)使用 get() 和 get_nowait() 引起的。
示例如下:
import queuetry: q = queue.Queue(3) # 設(shè)置隊列上限為3 q.put(‘python’) # 在隊列中插入字符串 ‘python’ q.put(‘-‘) # 在隊列中插入字符串 ‘-‘ q.put(‘100’) # 在隊列中插入字符串 ‘100’ for i in range(4): # 從隊列中取數(shù)據(jù),取出次數(shù)為4次,引發(fā) queue.Empty 異常 print(q.get(block=False))except queue.Empty: print(‘queue.Empty’)
queue.Full 異常
當(dāng)隊列數(shù)據(jù)元素容量達到上限時,繼續(xù)往隊列中放入數(shù)據(jù)會引發(fā) queue.Empty 異常,主要是不正當(dāng)使用 put() 和 put_nowait() 引起的。
示例如下:
import queuetry: q = queue.Queue(3) # 設(shè)置隊列上限為3 q.put(‘python’) # 在隊列中插入字符串 ‘python’ q.put(‘-‘) # 在隊列中插入字符串 ‘-‘ q.put(‘100’) # 在隊列中插入字符串 ‘100’ q.put(‘stay hungry, stay foolish’, block=False) # 隊列已滿,繼續(xù)往隊列中放入數(shù)據(jù),引發(fā) queue.Full 異常except queue.Full: print(‘queue.Full’)
Queue、LifoQueue、PriorityQueue 和 SimpleQueue 對象的基本使用方法
Queue、LifoQueue、PriorityQueue 和 SimpleQueue 四種隊列定義的對象均提供了以下函數(shù)使用方法,下面以 Queue 隊列為例進行介紹。
Queue.qsize()
返回隊列中數(shù)據(jù)元素的個數(shù)。
示例如下:
import queueq = queue.Queue()q.put(‘python-100’) # 在隊列中插入元素 ‘python-100’print(q.qsize()) # 輸出隊列中元素個數(shù)為1
Queue.empty()
如果隊列為空,返回 True,否則返回 False。
示例如下:
import queueq = queue.Queue()print(q.empty()) # 對列為空,返回 Trueq.put(‘python-100’) # 在隊列中插入元素 ‘python-100’print(q.empty()) # 對列不為空,返回 False
Queue.full()
如果隊列中元素個數(shù)達到上限,返回 True,否則返回 False。
示例如下:
import queueq = queue.Queue(3) # 定義一個長度為3的隊列print(q.full()) # 元素個數(shù)未達到上限,返回 Falseq.put(‘python’) # 在隊列中插入字符串 ‘python’q.put(‘-‘) # 在隊列中插入字符串 ‘-‘q.put(‘100’) # 在隊列中插入字符串 ‘100’print(q.full()) # 元素個數(shù)達到上限,返回 True
Queue.put(item, block=True, timeout=None)
- item,放入隊列中的數(shù)據(jù)元素。
- block,當(dāng)隊列中元素個數(shù)達到上限繼續(xù)往里放數(shù)據(jù)時:如果 block=False,直接引發(fā) queue.Full 異常;如果 block=True,且 timeout=None,則一直等待直到有數(shù)據(jù)出隊列后可以放入數(shù)據(jù);如果 block=True,且 timeout=N,N 為某一正整數(shù)時,則等待 N 秒,如果隊列中還沒有位置放入數(shù)據(jù)就引發(fā) queue.Full 異常。
- timeout,設(shè)置超時時間。
示例如下:
import queuetry: q = queue.Queue(2) # 設(shè)置隊列上限為2 q.put(‘python’) # 在隊列中插入字符串 ‘python’ q.put(‘-‘) # 在隊列中插入字符串 ‘-‘ q.put(‘100’, block = True, timeout = 5) # 隊列已滿,繼續(xù)在隊列中插入字符串 ‘100’,等待5秒后會引發(fā) queue.Full 異常except queue.Full: print(‘queue.Full’)
Queue.put_nowait(item)
相當(dāng)于 Queue.put(item, block=False),當(dāng)隊列中元素個數(shù)達到上限繼續(xù)往里放數(shù)據(jù)時直接引發(fā) queue.Full 異常。
import queuetry: q = queue.Queue(2) # 設(shè)置隊列上限為2 q.put_nowait(‘python’) # 在隊列中插入字符串 ‘python’ q.put_nowait(‘-‘) # 在隊列中插入字符串 ‘-‘ q.put_nowait(‘100’) # 隊列已滿,繼續(xù)在隊列中插入字符串 ‘100’,直接引發(fā) queue.Full 異常except queue.Full: print(‘queue.Full’)
Queue.get(block=True, timeout=None)
從隊列中取出數(shù)據(jù)并返回該數(shù)據(jù)內(nèi)容。
- block,當(dāng)隊列中沒有數(shù)據(jù)元素繼續(xù)取數(shù)據(jù)時:如果 block=False,直接引發(fā) queue.Empty 異常;如果 block=True,且 timeout=None,則一直等待直到有數(shù)據(jù)入隊列后可以取出數(shù)據(jù);如果 block=True,且 timeout=N,N 為某一正整數(shù)時,則等待 N 秒,如果隊列中還沒有數(shù)據(jù)放入的話就引發(fā) queue.Empty 異常。
- timeout,設(shè)置超時時間。
示例如下:
import queuetry: q = queue.Queue() q.get(block = True, timeout = 5) # 隊列為空,往隊列中取數(shù)據(jù)時,等待5秒后會引發(fā) queue.Empty 異常except queue.Empty: print(‘queue.Empty’)
Queue.get_nowait()
相當(dāng)于 Queue.get(block=False)block,當(dāng)隊列中沒有數(shù)據(jù)元素繼續(xù)取數(shù)據(jù)時直接引發(fā) queue.Empty 異常。
示例如下:
import queuetry: q = queue.Queue() q.get_nowait() # 隊列為空,往隊列中取數(shù)據(jù)時直接引發(fā) queue.Empty 異常except queue.Empty: print(‘queue.Empty’)
Queue、LifoQueue 和 PriorityQueue 對象的高級使用方法
SimpleQueue 是 Python 3.7 版本中新加入的特性,與 Queue、LifoQueue 和 PriorityQueue 三種隊列相比缺少了 task_done 和 join 的高級使用方法,所以才會取名叫 Simple 了,下面介紹一下 task_done 和 join 的使用方法。
- task_done,表示隊列內(nèi)的數(shù)據(jù)元素已經(jīng)被取出,即每個 get 用于獲取一個數(shù)據(jù)元素, 后續(xù)調(diào)用 task_done 告訴隊列,該數(shù)據(jù)的處理已經(jīng)完成。如果被調(diào)用的次數(shù)多于放入隊列中的元素個數(shù),將引發(fā) ValueError 異常。
- join,一直阻塞直到隊列中的所有數(shù)據(jù)元素都被取出和執(zhí)行,只要有元素添加到 queue 中就會增加。當(dāng)未完成任務(wù)的計數(shù)等于0,join 就不會阻塞。
示例如下:
import queueq = queue.Queue()q.put(‘python’)q.put(‘-‘)q.put(‘100’)for i in range(3): print(q.get()) q.task_done() # 如果不執(zhí)行 task_done,join 會一直處于阻塞狀態(tài),等待 task_done 告知它數(shù)據(jù)的處理已經(jīng)完成q.join()
下面是一個經(jīng)典示例,生產(chǎn)者和消費者線程分別生產(chǎn)數(shù)據(jù)和消費數(shù)據(jù),先生產(chǎn)后消費。采用 task_done 和 join 確保處理信息在多個線程間安全交換,生產(chǎn)者生產(chǎn)的數(shù)據(jù)能夠全部被消費者消費掉。
from queue import Queueimport randomimport threadingimport time#生產(chǎn)者線程class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(5): print (“%s: %s is producing %d to the queue!” %(time.ctime(), self.getName(), i)) self.data.put(i) # 將生產(chǎn)的數(shù)據(jù)放入隊列 time.sleep(random.randrange(10)/5) print (“%s: %s finished!” %(time.ctime(), self.getName()))#消費者線程class Consumer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(5): val = self.data.get() # 拿出已經(jīng)生產(chǎn)好的數(shù)據(jù) print (“%s: %s is consuming. %d in the queue is consumed!” %(time.ctime(), self.getName(), val)) time.sleep(random.randrange(5)) self.data.task_done() # 告訴隊列有關(guān)這個數(shù)據(jù)的任務(wù)已經(jīng)處理完成 print (“%s: %s finished!” %(time.ctime(), self.getName()))#主線程def main(): queue = Queue() producer = Producer(‘Pro.’, queue) consumer = Consumer(‘Con.’, queue) producer.start() consumer.start() queue.join() # 阻塞,直到生產(chǎn)者生產(chǎn)的數(shù)據(jù)全都被消費掉 producer.join() # 等待生產(chǎn)者線程結(jié)束 consumer.join() # 等待消費者線程結(jié)束 print (‘All threads terminate!’) if __name__ == ‘__main__’: main()
總結(jié)
本節(jié)給大家介紹了 Python 的 queue 模塊,為 Python 工程師對該模塊的使用提供了支撐,讓大家對 queue 模塊的相關(guān)概念和使用有一個初步的了解。