昨天,我從日志中發現,在 Kafka 組協調員發起組重新平衡后,kafka 正在重新消費一些消息。這些消息已在兩天前被消耗(從日志中確認)。日志中還報告了另外兩個重新平衡,但它們不再重新使用消息。那么為什么第一次重新平衡會導致重新消費消息呢?存在哪些問題?我正在使用 golang kafka 客戶端。這是代碼config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest并且我們在聲明消息之前處理消息,所以我們似乎正在為 kafka 使用“至少發送一次”策略。我們在一臺機器上有三個代理,在另一臺機器上只有一個消費者線程(go 例程)。對于這種現象有什么解釋嗎?我認為這些消息肯定已經提交了,因為它們是在兩天前被消耗的,或者為什么 kafka 會在沒有提交的情況下保留偏移量超過兩天?消費代碼示例:func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() { realHanlder(message) // consumed data here session.MarkMessage(message, "") // mark offset}return nil}添加:應用程序重新啟動后發生重新平衡。還有另外兩次重新啟動并沒有導致重新啟動卡夫卡的配置log.retention.check.interval.ms=300000log.retention.hours=168zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0delete.topic.enable = trueauto.create.topics .enable=假
1 回答

ITMISS
TA貢獻1871條經驗 獲得超8個贊
通過閱讀golang saram客戶端和kafka服務器的源碼,最終找到原因如下
消費者組偏移保留時間為24 小時,這是 kafka 默認設置,而日志保留時間是我們明確設置的 7 天。
我的服務器應用程序運行在測試環境中,很少有人可以訪問,這意味著kafka生產者產生的消息可能很少,然后消費者組也沒有多少消息可以消費,因此消費者可能很長時間不會提交任何偏移量。
當消費偏移量超過24小時未更新時,由于偏移量配置,kafka代理/協調器將從分區中刪除消費偏移量。下次 saram 從 kafka 代理查詢偏移量在哪里時,客戶端當然什么也得不到。請注意,我們使用sarama.OffsetOldest作為初始值,然后 sarama 客戶端將從 kafka Broker 保留的消息開頭開始消費消息,這會導致消息重新消費,并且這種情況很可能發生,因為日志保留期為 7天
- 1 回答
- 0 關注
- 161 瀏覽
添加回答
舉報
0/150
提交
取消