2 回答

TA貢獻1811條經驗 獲得超4個贊
就像 Lior Chaga 在他的評論中所說的那樣,您正在手動將主題分區分配給您的消費者。這不是執行此操作的推薦方法。最重要的是,您的所有消費者似乎都在使用完全相同的 groupID。利用這種結構,有兩個線程消費,如果消費者的至少一個有一個特定的消息,沒有其他線程會得到一個。如果您希望所有的消費者線程都獲得自己的“一組”消息,而不會相互中斷,那么您需要給它們不同的group.ids。
要訂閱主題以便它為您處理自動重新平衡,然后消費,您應該執行以下操作(取自下面鏈接的 KafkaConsumer javadoc):
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Kafka 官方 javadocs 有更詳細的解釋:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
添加回答
舉報