亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何使每個消息處理成功?

如何使每個消息處理成功?

Go
精慕HU 2022-09-26 20:24:22
下面是一個包含 3 個 Go 例程的服務,用于處理來自 Kafka 的消息:通道 1 和通道 2 是 Go 中的無緩沖數據通道。通道就像一個排隊機制。Goroutine-1 從 kafka 主題讀取消息,在驗證消息后將其消息負載拋出到通道-1 上。Goroutine-2 從通道 1 讀取并處理有效負載,并將處理后的有效負載拋出通道 2。Goroutine-3 從通道 2 讀取數據,并將處理后的有效負載封裝到 http 數據包中,并向另一個服務執行 http 請求(使用 http 客戶端)。上述流程中的漏洞:在我們的例子中,由于服務之間的網絡連接不良或遠程服務未準備好接受來自Go-routine3的http請求(http客戶端超時),處理失敗,因此,上述服務會丟失該消息(已從Kafka主題中讀取)。戈魯廷-1目前訂閱來自卡夫卡的消息,而沒有向卡夫卡發送確認(通知戈魯廷-3已成功處理特定消息)正確性優先于性能。如何確保每條消息都得到成功處理?
查看完整描述

2 回答

?
白豬掌柜的

TA貢獻1893條經驗 獲得超10個贊

例如,通過新的通道-3將戈魯廷-3的反饋添加到戈魯廷-1。戈魯廷-1將阻塞,直到它得到頻道-3的確認。


// in gorouting 1

channel1 <- data

select {

    case <-channel3:

    case <-ctx.Done(): // or smth else to prevent deadlock 

}

...

// in gorouting 3

data := <-channel2

for {

    if err := sendData(data); err == nil {

        break

    }

}

channel3<-struct{}{}


查看完整回答
反對 回復 2022-09-26
?
撒科打諢

TA貢獻1934條經驗 獲得超2個贊

為了確保正確性,您需要在處理成功完成后提交(=確認)消息。
對于處理未成功完成的情況 - 通常,您需要自己實現重試機制。
這應該特定于您的用例,但通常您將消息拋回專用的Kafka重試主題(您創建),添加睡眠并再次處理消息。如果在 x 次后處理失敗 - 則將消息拋出到 DLQ(=死信隊列)。
你可以在這里閱讀更多:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/

查看完整回答
反對 回復 2022-09-26
  • 2 回答
  • 0 關注
  • 105 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號