生產者與消費者案例
1. 前言
本節內容是通過之前學習的 synchronized 關鍵字,實現多線程并發編程中最經典的生產者與消費者模式,這是本節課程的核心內容,所有的知識點都是圍繞這一經典模型展開的。本節有如下知識點:
- 生產者與消費者模型介紹,這是打開本節知識大門的鑰匙,也是本節內容的基礎;
- 了解生產者與消費者案例實現的三種方式,我們本節以 synchronized 關鍵字聯合 wait/notify 機制進行實現;
- wait 方法和 notify 方法介紹,這是我們實現生產者與消費者案例的技術基礎;
- 生產者與消費者案例代碼實現,這是我們本節內容的核心,一定要對此知識點進行深入的學習和掌握。
2. 生產者與消費者模型介紹
定義: 生產者消費者模式是一個十分經典的多線程并發協作的模式。
意義:弄懂生產者消費者問題能夠讓我們對并發編程的理解加深。
介紹:所謂生產者 - 消費者問題,實際上主要是包含了兩類線程,一種是生產者線程用于生產數據,另一種是消費者線程用于消費數據,為了解耦生產者和消費者的關系,通常會采用共享的數據區域。
共享的數據區域就像是一個倉庫,生產者生產數據之后直接放置在共享數據區中,并不需要關心消費者的行為。而消費者只需要從共享數據區中去獲取數據,就不再需要關心生產者的行為。
3. 生產者與消費者三種實現方式
在實現生產者消費者問題時,可以采用三種方式:
- 使用 Object 的 wait/notify 的消息通知機制,本節課程我們采用該方式結合 synchronized 關鍵字進行生產者與消費者模式的實現;
- 使用 Lock 的 Condition 的 await/signal 的消息通知機制;
- 使用 BlockingQueue 實現。本文主要將這三種實現方式進行總結歸納。
4. wait 與 notify
Java 中,可以通過配合調用 Object 對象的 wait () 方法和 notify () 方法或 notifyAll () 方法來實現線程間的通信。
wait 方法:我們之前對 wait 方法有了基礎的了解,在線程中調用 wait () 方法,將阻塞當前線程,并且釋放鎖,直至等到其他線程調用了調用 notify () 方法或 notifyAll () 方法進行通知之后,當前線程才能從 wait () 方法出返回,繼續執行下面的操作。
notify 方法:即喚醒,notify 方法使原來在該對象上 wait 的線程退出 waiting 狀態,使得該線程從等待隊列中移入到同步隊列中去,等待下一次能夠有機會獲取到對象監視器鎖。
notifyAll 方法:即喚醒全部 waiting 線程,與 notify 方法在效果上一致。
5. 生產者與消費者案例
為了更好地理解并掌握生產者與消費者模式的實現,我們先來進行場景設計,然后再通過實例代碼進行實現并觀察運行結果。
場景設計:
- 創建一個工廠類 ProductFactory,該類包含兩個方法,produce 生產方法和 consume 消費方法;
- 對于 produce 方法,當沒有庫存或者庫存達到 10 時,停止生產。為了更便于觀察結果,每生產一個產品,sleep 5000 毫秒;
- 對于 consume 方法,只要有庫存就進行消費。為了更便于觀察結果,每消費一個產品,sleep 5000 毫秒;
- 庫存使用 LinkedList 進行實現,此時 LinkedList 即共享數據內存;
- 創建一個 Producer 生產者類,用于調用 ProductFactory 的 produce 方法。生產過程中,要對每個產品從 0 開始進行編號;
- 創建一個 Consumer 消費者類,用于調用 ProductFactory 的 consume 方法;
- 創建一個測試類,main 函數中創建 2 個生產者和 3 個消費者,運行程序進行結果觀察。
實例:創建一個工廠類 ProductFactory
class ProductFactory {
private LinkedList<String> products; //根據需求定義庫存,用 LinkedList 實現
private int capacity = 10; // 根據需求:定義最大庫存 10
public ProductFactory() {
products = new LinkedList<String>();
}
// 根據需求:produce 方法創建
public synchronized void produce(String product) {
while (capacity == products.size()) { //根據需求:如果達到 10 庫存,停止生產
try {
System.out.println("警告:線程("+Thread.currentThread().getName() + ")準備生產產品,但產品池已滿");
wait(); // 庫存達到 10 ,生產線程進入 wait 狀態
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(product); //如果沒有到 10 庫存,進行產品添加
try {
Thread.sleep(5000); //根據需求為了便于觀察結果,每生產一個產品,sleep 5000 ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程("+Thread.currentThread().getName() + ")生產了一件產品:" + product+";當前剩余商品"+products.size()+"個");
notify(); //生產了產品,通知消費者線程從 wait 狀態喚醒,進行消費
}
// 根據需求:consume 方法創建
public synchronized String consume() {
while (products.size()==0) { //根據需求:沒有庫存消費者進入wait狀態
try {
System.out.println("警告:線程("+Thread.currentThread().getName() + ")準備消費產品,但當前沒有產品");
wait(); //庫存為 0 ,無法消費,進入 wait ,等待生產者線程喚醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String product = products.remove(0) ; //如果有庫存則消費,并移除消費掉的產品
try {
Thread.sleep(5000);//根據需求為了便于觀察結果,每消費一個產品,sleep 5000 ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程("+Thread.currentThread().getName() + ")消費了一件產品:" + product+";當前剩余商品"+products.size()+"個");
notify();// 通知生產者繼續生產
return product;
}
}
實例:Producer 生產者類創建
class Producer implements Runnable {
private ProductFactory productFactory; //關聯工廠類,調用 produce 方法
public Producer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
int i = 0 ; // 根據需求,對產品進行編號
while (true) {
productFactory.produce(String.valueOf(i)); //根據需求 ,調用 productFactory 的 produce 方法
i++;
}
}
}
實例:Consumer 消費者類創建
class Consumer implements Runnable {
private ProductFactory productFactory;
public Consumer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
while (true) {
productFactory.consume();
}
}
}
實例: 創建測試類,2 個生產者,3 個消費者
public class DemoTest extends Thread{
public static void main(String[] args) {
ProductFactory productFactory = new ProductFactory();
new Thread(new Producer(productFactory),"1號生產者"). start();
new Thread(new Producer(productFactory),"2號生產者"). start();
new Thread(new Consumer(productFactory),"1號消費者"). start();
new Thread(new Consumer(productFactory),"2號消費者"). start();
new Thread(new Consumer(productFactory),"3號消費者"). start();
}
}
結果驗證:
線程(1號生產者)生產了一件產品:0;當前剩余商品1個
線程(3號消費者)消費了一件產品:0;當前剩余商品0個
警告:線程(2號消費者)準備消費產品,但當前沒有產品
警告:線程(1號消費者)準備消費產品,但當前沒有產品
線程(2號生產者)生產了一件產品:0;當前剩余商品1個
線程(2號消費者)消費了一件產品:0;當前剩余商品0個
警告:線程(1號消費者)準備消費產品,但當前沒有產品
線程(2號生產者)生產了一件產品:1;當前剩余商品1個
線程(3號消費者)消費了一件產品:1;當前剩余商品0個
線程(1號生產者)生產了一件產品:1;當前剩余商品1個
線程(3號消費者)消費了一件產品:1;當前剩余商品0個
線程(2號生產者)生產了一件產品:2;當前剩余商品1個
線程(1號消費者)消費了一件產品:2;當前剩余商品0個
警告:線程(2號消費者)準備消費產品,但當前沒有產品
線程(2號生產者)生產了一件產品:3;當前剩余商品1個
...
...
結果分析:
從結果來看,生產者線程和消費者線程合作無間,當沒有產品時,消費者線程進入等待;當產品達到 10 個最大庫存是,生產者進入等待。這就是經典的生產者 - 消費者模型。
6. 小結
實現多線程并發編程中最經典的生產者與消費者模式,這是本節課程的核心內容,所有的知識點都是圍繞這一經典模型展開的。 在掌握 synchronized 關鍵字,wait 方法和 notify 方法的基礎上,理解并掌握生產者與消費者模式是本節課程的最終目標。