我正在嘗試運行 2 個訂閱了 2 個不同主題的消費者。兩個消費者程序每次運行一個時都運行正常,但同時運行時,其中一個消費者總是顯示異常:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.我遵循了建議max.pool.size,將 2設置為session.timeout.ms30000,1000heartbeat.interval.ms下面是我的消費者函數,這兩個文件的函數是相同的,只是主題名稱更改為Test2,并且我在同時運行的 2 個不同類中運行這兩個函數。 public void consume() { //Kafka consumer configuration settings List<String> topicNames = new ArrayList<String>(); topicNames.add("Test1"); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put("heartbeat.interval.ms", "1000"); props.put("max.poll.records", "2"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topicNames);由于此錯誤,記錄不會在Kafka主題中提交。我該如何克服這個錯誤?
1 回答

倚天杖
TA貢獻1828條經驗 獲得超3個贊
在您的情況下,您需要為消費者分配不同的組 ID。您正在使用相同的組 ID 創建兩個消費者(這是可以的),但是調用 subscribe 兩次是不行的。
您可以一次運行一個消費者,因為您只調用 subscribe 一次。
如果您需要任何進一步的幫助,請告訴我。很高興能幫助你。
添加回答
舉報
0/150
提交
取消