3 回答

TA貢獻1877條經驗 獲得超6個贊
這是嘗試取消確認模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它應該能夠處理您的問題。您應該容忍通過隊列重復提交數據的機會。這是一個例子:
定義抽象操作并為操作分配 ID 和時間戳。
將狀態Pending寫入數據庫(可以和1一樣的步驟)
編寫一個偵聽器,輪詢數據庫中所有狀態為掛起且早于“超時”的操作
對于每個掛起的操作,通過具有分配 ID 的隊列發送數據。
接收方應該知道 ID,如果 ID 已被處理,則不會發生任何事情。
6A。如果您需要 100% 確認操作已完成,您需要第二個隊列,接收方將在其中發布消息 ID - DONE。如果不需要這種一致性,請跳過此步驟?;蛘撸梢园l布 ID -Failed 失敗原因。
6B。提交方要么等待來自 6A 的消息,要么通過將狀態 DONE 寫入數據庫來完成操作。
一旦 sertine 超時已過或某個重試限制已過。您將狀態寫入操作 FAIL。
您可以通過 ID 回滾將消息發送到接收方操作。
請注意,所有這些步驟都不涉及技術事務。您可以使用非事務性數據庫執行此操作。
我寫的是嘗試取消確認模式的變體,其中每個消息接收者都應該知道如何管理自己的數據。

TA貢獻1886條經驗 獲得超2個贊
如果有足夠的時間來修改設計,建議使用類似 JTA 的 API 來管理 2phase 提交。甚至 weblogic 和 WebSphere 也支持用于兩階段提交的 XA 資源。
如果時間線較少,建議執行以下操作以減少失敗間隔。
發送數據主題(不提交)(incase topic down, retry to be perform with a interval)
將數據寫入數據庫
提交數據庫
提交主題
只有當第 4 步失敗時才會發生這里失敗。這將導致再次發送相同的消息。所以接收系統會收到重復的消息。在JMS2.0 結構中,每條消息都有唯一的messageID 和CorrelationID。所以找到重復項有點直截了當(但這將在接收系統中處理)
這兩種情況也適用于集群環境。
嚴格針對您的情況,認為以下步驟可能有助于解決您的問題
為您的主題訂閱一個偵聽器 listener-1。
過程-1
為消息 msg-1 添加狀態為“待發送”的數據庫條目
向主題發送消息 msg-1。在任何主題失敗的情況下重試發送如果在某些重試后步驟 2 失敗,process-1 必須在發送任何新消息之前重新發送 msg-1 或回滾步驟 1
聽眾-1
使用訂閱的偵聽器,從主題讀取參考(meesageID/correlationID),并將數據庫狀態更新為已發送,并從主題讀取/刪除消息。如果參考讀取成功并且數據庫更新失敗,主題仍然有消息。所以下一次讀取將更新數據庫。Incase 數據庫更新成功但消息刪除失敗。聽眾將再次閱讀并嘗試更新已經完成的消息。所以驗證后可以忽略。
Incase listener 本身宕機,topic 將有消息,直到 listener 閱讀消息。在此之前,SENT 消息將處于“待發送”狀態。

TA貢獻1719條經驗 獲得超6個贊
在偵聽器中保存數據庫行,其中包含字段 staus='pending'
另一個作業(獨立線程)將從數據庫中獲取所有待處理的行,并對每一行進行以下操作:
2.1 將數據發送到主題
2.2 保存到數據庫中
如果我們在第 1 步失敗- 一切正常 - 數據處于一致狀態,因為作業不會知道該數據的任何信息
如果我們在步驟 2.1 上失敗了——沒問題,下一個作業調用將嘗試處理它
如果我們在步驟 2.2 上失敗了——如果我們在這里失敗了——這意味著下一個作業調用將再次處理相同的數據。乍一看你可以認為這是一個問題。但是您的消費者必須是冪等的——這意味著它必須了解消息已經被處理并跳過處理。此要求是所有消息代理都保證消息將至少傳遞一次的結果。因此,無論如何,我們的消費者都必須為重復的消息做好準備。沒問題了。
添加回答
舉報