下面是一個包含 3 個 Go 例程的服務,用于處理來自 Kafka 的消息:通道 1 和通道 2 是 Go 中的無緩沖數據通道。通道就像一個排隊機制。Goroutine-1 從 kafka 主題讀取消息,在驗證消息后將其消息負載拋出到通道-1 上。Goroutine-2 從通道 1 讀取并處理有效負載,并將處理后的有效負載拋出通道 2。Goroutine-3 從通道 2 讀取數據,并將處理后的有效負載封裝到 http 數據包中,并向另一個服務執行 http 請求(使用 http 客戶端)。上述流程中的漏洞:在我們的例子中,由于服務之間的網絡連接不良或遠程服務未準備好接受來自Go-routine3的http請求(http客戶端超時),處理失敗,因此,上述服務會丟失該消息(已從Kafka主題中讀取)。戈魯廷-1目前訂閱來自卡夫卡的消息,而沒有向卡夫卡發送確認(通知戈魯廷-3已成功處理特定消息)正確性優先于性能。如何確保每條消息都得到成功處理?
2 回答

白豬掌柜的
TA貢獻1893條經驗 獲得超10個贊
例如,通過新的通道-3將戈魯廷-3的反饋添加到戈魯廷-1。戈魯廷-1將阻塞,直到它得到頻道-3的確認。
// in gorouting 1
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // or smth else to prevent deadlock
}
...
// in gorouting 3
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3<-struct{}{}

撒科打諢
TA貢獻1934條經驗 獲得超2個贊
為了確保正確性,您需要在處理成功完成后提交(=確認)消息。
對于處理未成功完成的情況 - 通常,您需要自己實現重試機制。
這應該特定于您的用例,但通常您將消息拋回專用的Kafka重試主題(您創建),添加睡眠并再次處理消息。如果在 x 次后處理失敗 - 則將消息拋出到 DLQ(=死信隊列)。
你可以在這里閱讀更多:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
- 2 回答
- 0 關注
- 105 瀏覽
添加回答
舉報
0/150
提交
取消