1. 前言
RabbitMQ經常被用于服務模塊之間的解耦以及高并發削峰場景,之前的章節討論了不同服務模式的特點,但是在生產環境中,因為機器以及網絡設備的不可靠,保證消息的可靠是待解決的問題。在特定場景下消息可能存在丟失風險,本文將介紹如何預防這類的風險。
2. RabbitMQ消息丟失的場景
面試官提問:RabbitMQ 消息隊列,在哪些場景下可能會丟失消息?
題目解析:
我們可以將 RabbitMQ 消息處理的過程分為三個步驟:
(1)生產階段:生產者生產消息并且發送到消息隊列;
(2)儲存階段:消息隊列存儲和處理消息;
(3)消費階段:消息隊列將消息轉發到消費者。
上述每個步驟都有消息丟失的風險,候選人需要按順序分別解釋不同場景可能丟失的原因以及解決方案。
2.1 生產者生產消息并且發送到消息隊列
可能發生消息丟失的場景:網絡故障。網絡環境的不可靠導致消息發送失敗,例如網絡丟包、網絡故障。數據在網絡中傳輸會經過諸多網絡設備,只要其中一個網絡鏈接在數據抵達前已經流量滿載,新到的數據將會阻塞一段時間段。另外比較少見的例子是施工挖斷光纖或者其他原因導致硬件層面的長時間不可用。
參考解決方案是使用AMQP協議的事務機制。生產者在發出消息之后,消息是否到達RabbitMQ服務器是默認不可知的,所以在生產者發送消息之前,調用channel.txSelect
語句開啟事務,如果消息發送失敗,那么調用channel.txRollback
回滾事務,重新發送一條消息;如果消息發送成功,那么調用channel.txCommit
提交事務。
采用事務的缺點是增加耗時,會降低RabbitMQ的吞吐性能。
所以RabbitMQ還有一種性能改進方案,即Confirm機制,步驟如下:
(1)生產者調用channel.confirmSelect
將通信方式設置為confirm
模式;
(2)生產者發送的所有消息都會被分配一個唯一 ID;
(3)當生產者發送的消息成功投遞到隊列之后,RabbitMQ會發送一個確認給生產者,生產者即得知這條消息已經成功發送。
2.2 消息隊列存儲和處理消息
可能發生消息丟失的場景:服務器宕機。消息存儲在 RabbitMQ 隊列中,如果隊列沒有持久化,RabbitMQ 服務器重啟會導致消息丟失。
參考解決方案是對消息隊列持久化,分為三個步驟:
(1)Exchange 持久化:以 Direct 模式為例,將 durable 參數設置為 true。示例:
@Bean
DirectExchange testExchange() {
return new DirectExchange(Constants.EXCHANGE_NAME, true, false);
}
(2)Queue 持久化:將 durable 參數設置為 true,但是這樣只能保證持久化 Queue 的元數據,但是不會持久化 Queue 里存儲的消息。示例:
@Bean
public Queue testQueue() {
return new Queue(Constants.QUEUE_NAME);
}
(3)消息持久化:發送消息的時候將deliveryMode設置為2,SpringBoot中的rabbitTemplate默認設置消息是持久化,所以我們不需要手動配置,具體原因可參考源碼,示例:
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
private MessageDeliveryMode() {
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
//非持久化模式
case NON_PERSISTENT:
return 1;
//持久化模式
case PERSISTENT:
return 2;
default:
return -1;
}
}
2.3 消息隊列將消息轉發到消費者
可能發生消息丟失的場景:消費者在收到消息之后,還沒來得及處理消息的消費邏輯,所在機器就宕機了,導致內存中的消息丟失。
參考解決方案是在消費端開啟手動 ACK 模式。RabbitMQ 默認采用自動 ACK 機制,在沒有處理業務邏輯之前,消費者就會告知消息隊列已經成功收到消息,這種方式并不符合我們的預期。
以 SpringBoot 的配置方式為例,有兩種配置手動 ACK 的方式:
(1)yml文件修改全局確認模式,示例:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2)在自動注入 RabbitListenerContainerFactory 時開啟手動ACK,示例:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//1. 創建工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//2. 設置手動ACK模式,即AcknowledgeMode.MANUAL
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
3. 小結
本章節介紹了 RabbitMQ 作為消息隊列,容易產生消息丟失的三種場景,以及針對每種場景的關鍵解決方案,從性質上可以分為持久化和消息確認機制。拋開題目本身來說,建議候選人通過本地環境實戰來體驗每種解決方案的具體編碼,加強對方案的理解。