最近,我開始學習使用kafka工作。我正在從事的項目使用sarama。為了閱讀我使用的消息ConsumerGroup。foo如果返回,我需要在一段時間后再次閱讀該消息false。如何才能做到這一點?func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { if ok := foo(message); ok { session.MarkMessage(message, "") } else { // ??? } } return nil}
1 回答

白衣染霜花
TA貢獻1796條經驗 獲得超10個贊
Setup()
您可以通過在您的消費者組的回調中包含以下內容來將消費者組的偏移量重置為較舊的偏移量:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error { sess.ResetOffset(topic, partition, offset, "") return nil}
您也可以通過控制臺實現相同的目的:
kafka-consumer-groups \ --bootstrap-server localhost:9092 \ --group my-consumer-group \ --topic myTopicName \ --reset-offsets \ --to-offfset 100 \ --execute
- 1 回答
- 0 關注
- 160 瀏覽
添加回答
舉報
0/150
提交
取消