當我使用新的組 ID 注冊消費者時,前 N 次輪詢調用不返回任何內容。我想測試當我調用服務時,會發布一個 Kafka 事件。問題是每當我更改 groupId 時,前 N 個民意調查都不會返回任何內容。我了解 Kafka 在輪詢時首先注冊消費者,但我發現注冊消費者所需的輪詢次數(時間)過于隨機。消費者配置:Properties props = new Properties();props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC_NAME));腳步:在每次測試之前,我consumer.poll(Duration.ofSeconds(5))只是為了確保消費者已注冊并設置了偏移量。我調用服務并斷言響應。如果我使用 UI 檢查 Kafka,則會發布事件。我打電話consumer.poll(Duration.ofSeconds(5)),希望能收到一些記錄。這是失敗的一步。有沒有辦法確保第二次投票總是返回記錄?我試圖讓第一次投票持續 1 分鐘(我已經認為 5 秒對于等待每次測試來說太長了),它有時仍然有效,有時無效。謝謝。
1 回答

一只萌萌小番薯
TA貢獻1795條經驗 獲得超7個贊
它不適用于您的“新 groupId”的原因是您處于“最新”模式。
默認值為“最新”,您需要處于“最早”模式或使用您的“新 groupId”首次輪詢或為此主題的此“新 groupId”提交偏移量。
您需要將“groupId”注冊到主題,而不是消費者。
添加回答
舉報
0/150
提交
取消