亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

Python 生產者消費者模型

1. 簡介

生產者和消費者問題是線程模型中的經典問題:

  • 生產者和消費者共享同一個存儲空間
  • 生產者往存儲空間中添加產品,消費者從存儲空間中取走產品
  • 當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞

Python 的內置模塊 queue 提供了對生產者和消費者模型的支持,模塊 queue 定義了類 Queue,類 Queue 表示一個被生產者和消費者共享的隊列,類 Queue 提供如下常用方法:

方法 功能
get() 從隊列中取走數據,如果隊列為空,則阻塞
put(item) 向隊列中放置數據,如果隊列為慢,則阻塞
join() 如果隊列不為空,則等待隊列變為空
task_done() 消費者從隊列中取走一項數據,當隊列變為空時,喚醒調用 join() 的線程

2. 實現生產者消費者模型

創建生產者線程和消費者線程,使用一個共享隊列連接這兩個線程,代碼如下:

import threading
import queue

q = queue.Queue()
  • 導入 threading 模塊和 queue 模塊
  • 創建共享隊列 q
def produce():
    for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
        q.put(item)
        print('produce %s' % item)
  • 創建生產者線程的入口函數 produce
  • 生產者生產 8 個數據
  • 調用 q.put(item) 將生產的數據放入到共享隊列 q 中
def consume():
    for i in range(8):
        item = q.get()
        print('  consume %s' % item)
  • 創建消費者線程的入口函數 consume
  • 消費者消費 8 個數據
  • 調用 q.get() 從共享隊列 q 中取走數據
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
producer.join()
consumer.join()
  • 創建生產者線程 producer,線程入口為 produce
  • 創建消費者線程 consumer,線程入口為 consume
  • 啟動生產者線程和消費者線程,并等待它們結束

運行程序,輸出結果如下:

produce a
produce b
  consume a
produce c
  consume b
  consume c
produce d
  consume d
produce e
  consume e
produce f
  consume f
produce g
  consume g
produce h
  consume h
  • 生產者生產了 8 個數據:a、b、c、d、e、f、g、h
  • 消費者取走了 8 個數據:a、b、c、d、e、f、g、h

3. 實現生產者、計算者、消費者模型

創建生產者、計算者、消費者線程:

  • 生產者生產 8 個數據
  • 計算者對生產者輸出的數據進行加工,將加工后的數據送往消費者
  • 消費者取走計算者輸出的數據
import threading
import queue

q0 = queue.Queue()
q1 = queue.Queue()
  • 導入模塊 threading 和模塊 queue
  • 使用兩個共享隊列連接這三個線程
    • 共享隊列 q0 連接生產者和計算者
    • 共享隊列 q1 連接計算者和消費者
def produce():
    for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
        q0.put(item)
        print('produce %s' % item)
  • 創建生產者線程的入口函數 produce
  • 生產者生產 8 個數據
  • 調用 q0.put(item) 將生產的數據放入到共享隊列 q0 中
def compute():
    for i in range(8):
        item = q0.get()
        item = item.upper() 
        q1.put(item)
  • 創建計算者線程的入口函數 compute
  • 調用 q0.get() 讀取生產者輸出數據,并進行加工
  • 調用 q1.put(item) 將加工后的數據放入到共享隊列 q1 中
def consume():
    for i in range(8):
        item = q1.get()
        print('  consume %s' % item)
  • 創建消費者線程的入口函數 consume
  • 消費者消費 8 個數據
  • 調用 q1.get() 從共享隊列 q1 中取走數據
producer = threading.Thread(target=produce, args=())
computer = threading.Thread(target=compute, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
computer.start()
consumer.start()

producer.join()
computer.join()
consumer.join()
  • 創建生產者線程 producer,線程入口為 produce
  • 創建計算者線程 computer,線程入口為 compute
  • 創建消費者線程 consumer,線程入口為 consume
  • 啟動生產者線程、計算者線程、消費者線程,并等待它們結束

運行程序,輸出結果如下:

produce a
produce b
produce c
  consume A
produce d
produce e
  consume B
produce f
  consume C
produce g
  consume D
produce h
  consume E
  consume F
  consume G
  consume H
  • 生產者生產了 8 個數據:a、b、c、d、e、f、g、h
  • 計算者將數據加工為:A、B、C、D、E、F、G、H
  • 消費者取走了 8 個數據:A、B、C、D、E、F、G、H

4. 同步生產者與消費者的推進速度

在生產者、消費者模型中,可能會存在兩者推進速度不匹配的問題:生產者生產數據的速度較快,但是,消費者取走數據的速度較慢。

可以使用 queue 的 task_done() 方法和 join() 方法同步生產者與消費者的推進速度:

  • 生產者調用 join() 方法,等待隊列中所有的數據被取走
  • 消費者調用 task_done() 方法,表示取走了隊列中的一項數據,當隊列為空時,喚醒阻塞在 join() 方法中的生產者
import threading
import queue

q = queue.Queue()
  • 導入 threading 模塊和 queue 模塊
  • 創建共享隊列 q
def produce():
    for item in ['A', 'B', 'C', 'D']:
        q.put(item)
        print('produce %s' % item)
    q.join()
    print('------------ q is empty')

    for item in ['E', 'F', 'G', 'H']:
        q.put(item)            
        print('produce %s' % item)
    q.join()        
    print('------------ q is empty')
  • 創建生產者線程的入口函數 produce
  • 首先,生產 4 個數據:A、B、C、D
    • 調用 q.put(item) 將它們放入到隊列 q 中
    • 調用 q.join() 等待消費者將它們全部取走
  • 然后,生產 4 個數據:E、F、G、G
    • 調用 q.put(item) 將它們放入到隊列 q 中
    • 調用 q.join() 等待消費者將它們全部取走
def consume():
    for i in range(8):
        item = q.get()
        print('  consume %s' % item)
        q.task_done()
  • 創建消費者線程的入口函數 consume
  • 調用 q.get() 從隊列 q 中取走一個數據
  • 調用 q.task_done(),表示已經從隊列 q 中取走了一個數據,當隊列為空時,喚醒生產者
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
  • 創建生產者線程 producer,線程入口為 produce
  • 創建消費者線程 consumer,線程入口為 consume
  • 啟動生產者線程和消費者線程,并等待它們結束

運行程序,輸出結果如下:

produce A
produce B
  consume A
  consume B
produce C
  consume C
produce D
  consume D
------------ q is empty
produce E
  consume E
produce F
  consume F
produce G
produce H
  consume G
  consume H
------------ q is empty
  • 生產者生產第一批數據 A、B、C、D,消費者將其取走
  • 當第一批數據完全被消費者取走后,生產者才開始生產第二批數據
  • 生產者生產第二批數據 E、F、G、H,消費者將其取走