我使用此配置創建一個新消費者:c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": addresses, "group.id": "my_group", "auto.offset.reset": "earliest", })topic := "testTopic"if err = c.SubscribeTopics([]string{topic}, nil); err != nil { panic(err)}然后我根據以下代碼生成事件并使用一個事件:events := []map[string]string{{ "name": "Foo",}, { "name": "Bar", }, } err = p.ProduceEvent(events[0])//there is a wrapper to produce events err = p.ProduceEvent(events[1]) res, err := c.ReadMessage(100 * time.Second) time.Sleep(20 * time.Second) c.Close() 當我用 描述該組時 watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe。每一步的結果是:產生事件后: 當我消費一個事件時: 關閉消費者后: 我不明白為什么最后滯后為零!我只消耗了一個事件。這對我來說很奇怪,那Close會改變偏移量。任何線索表示贊賞。
1 回答

拉風的咖菲貓
TA貢獻1995條經驗 獲得超2個贊
ReadMessage
包裹Poll
。Poll
獲取一批消息并在本地緩沖它們。由于您已將消費者配置為自動提交偏移量,因此它將提交所有獲取的消息,甚至是那些在本地緩存且您的應用程序仍未處理的消息。這就是為什么您看到關閉消費者后沒有延遲。
librdkafka
(因此confluent-kafka-go
)沒有辦法配置max.pool.records
,所以如果你想準確控制哪些偏移量被提交,你需要禁用自動提交偏移量并使用手動提交它們StoreOffsets
:https ://github.com/confluentinc/confluent- kafka-go/issues/380#issuecomment-539903016
- 1 回答
- 0 關注
- 293 瀏覽
添加回答
舉報
0/150
提交
取消