亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何將消息保存到數據庫并將響應發送到主題最終一致?

如何將消息保存到數據庫并將響應發送到主題最終一致?

侃侃無極 2023-03-02 16:17:21
我有以下 rabbitMq 消費者:Consumer consumer = new DefaultConsumer(channel) {    @Override      public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {            String message = new String(body, "UTF-8");             sendNotificationIntoTopic(message);             saveIntoDatabase(message);      } };可能會出現以下情況:消息已成功發送到主題與數據庫的連接丟失,因此數據庫插入失敗。結果我們有數據不一致。預期結果要么兩個操作都成功執行,要么根本沒有執行。任何解決方案我怎樣才能實現它?聚苯乙烯目前我有以下想法(請評論)我們可以假設代理不會丟失任何消息。我們必須訂閱要發送的主題。將條目保存到數據庫中并設置status值為“pending”的 字段嘗試向主題發送數據。如果發送成功 - 更新status值為“成功”的字段我們必須有一個計劃作業,它必須檢查具有掛起狀態的行。目前可能有兩種情況:3.1 根本沒有發送通知3.2 發送了通知但存入數據庫失敗(概率很低但有可能)所以我們必須以某種方式區分這兩種情況:我們可以將來自主題的消息存儲在集合中,作業可以檢查消息是否被接受。因此,如果作業找到與數據庫行對應的消息,我們必須將狀態更新為“成功”。否則我們必須從數據庫中刪除條目。我認為我的想法有一些弱點(例如,如果我們有多節點應用程序,我們必須將消息存儲在 hazelcast(或類似物)中,但這是假設失敗的額外點)
查看完整描述

3 回答

?
慕哥9229398

TA貢獻1877條經驗 獲得超6個贊

這是嘗試取消確認模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它應該能夠處理您的問題。您應該容忍通過隊列重復提交數據的機會。這是一個例子:

  1. 定義抽象操作并為操作分配 ID 和時間戳。

  2. 將狀態Pending寫入數據庫(可以和1一樣的步驟)

  3. 編寫一個偵聽器,輪詢數據庫中所有狀態為掛起且早于“超時”的操作

  4. 對于每個掛起的操作,通過具有分配 ID 的隊列發送數據。

  5. 接收方應該知道 ID,如果 ID 已被處理,則不會發生任何事情。

6A。如果您需要 100% 確認操作已完成,您需要第二個隊列,接收方將在其中發布消息 ID - DONE。如果不需要這種一致性,請跳過此步驟?;蛘撸梢园l布 ID -Failed 失敗原因。

6B。提交方要么等待來自 6A 的消息,要么通過將狀態 DONE 寫入數據庫來完成操作。

  • 一旦 sertine 超時已過或某個重試限制已過。您將狀態寫入操作 FAIL。

  • 您可以通過 ID 回滾將消息發送到接收方操作。

請注意,所有這些步驟都不涉及技術事務。您可以使用非事務性數據庫執行此操作。

我寫的是嘗試取消確認模式的變體,其中每個消息接收者都應該知道如何管理自己的數據。


查看完整回答
反對 回復 2023-03-02
?
MM們

TA貢獻1886條經驗 獲得超2個贊

如果有足夠的時間來修改設計,建議使用類似 JTA 的 API 來管理 2phase 提交。甚至 weblogic 和 WebSphere 也支持用于兩階段提交的 XA 資源。

如果時間線較少,建議執行以下操作以減少失敗間隔。

  • 發送數據主題(不提交)(incase topic down, retry to be perform with a interval)

  • 將數據寫入數據庫

  • 提交數據庫

  • 提交主題

只有當第 4 步失敗時才會發生這里失敗。這將導致再次發送相同的消息。所以接收系統會收到重復的消息。在JMS2.0 結構中,每條消息都有唯一的messageID 和CorrelationID。所以找到重復項有點直截了當(但這將在接收系統中處理)

這兩種情況也適用于集群環境。


嚴格針對您的情況,認為以下步驟可能有助于解決您的問題

為您的主題訂閱一個偵聽器 listener-1。

過程-1

  • 為消息 msg-1 添加狀態為“待發送”的數據庫條目

  • 向主題發送消息 msg-1。在任何主題失敗的情況下重試發送如果在某些重試后步驟 2 失敗,process-1 必須在發送任何新消息之前重新發送 msg-1 或回滾步驟 1

聽眾-1

  • 使用訂閱的偵聽器,從主題讀取參考(meesageID/correlationID),并將數據庫狀態更新為已發送,并從主題讀取/刪除消息。如果參考讀取成功并且數據庫更新失敗,主題仍然有消息。所以下一次讀取將更新數據庫。Incase 數據庫更新成功但消息刪除失敗。聽眾將再次閱讀并嘗試更新已經完成的消息。所以驗證后可以忽略。

Incase listener 本身宕機,topic 將有消息,直到 listener 閱讀消息。在此之前,SENT 消息將處于“待發送”狀態。


查看完整回答
反對 回復 2023-03-02
?
慕俠2389804

TA貢獻1719條經驗 獲得超6個贊

  1. 在偵聽器中保存數據庫行,其中包含字段 staus='pending'

  2. 另一個作業(獨立線程)將從數據庫中獲取所有待處理的行,并對每一行進行以下操作:
    2.1 將數據發送到主題
    2.2 保存到數據庫中

如果我們在第 1 步失敗- 一切正常 - 數據處于一致狀態,因為作業不會知道該數據的任何信息

如果我們在步驟 2.1 上失敗了——沒問題,下一個作業調用將嘗試處理它

如果我們在步驟 2.2 上失敗了——如果我們在這里失敗了——這意味著下一個作業調用將再次處理相同的數據。乍一看你可以認為這是一個問題。但是您的消費者必須是冪等的——這意味著它必須了解消息已經被處理并跳過處理。此要求是所有消息代理都保證消息將至少傳遞一次的結果。因此,無論如何,我們的消費者都必須為重復的消息做好準備。沒問題了。


查看完整回答
反對 回復 2023-03-02
  • 3 回答
  • 0 關注
  • 172 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號