亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

具有相同組 ID 的 Kafka 消費者線程使用相同的記錄

具有相同組 ID 的 Kafka 消費者線程使用相同的記錄

繁花不似錦 2021-11-17 15:26:19
我需要在多個線程中使用來自 Kafka 分區的記錄,每個線程上都有唯一的記錄進行處理。我有以下代碼,我不知道是什么錯誤public class ConsumerThread implements Runnable {    public String name;    public ConsumerThread(String name){        this.name = name;    }    public Properties getDefaultProperty(){        Properties prop = new Properties();        prop.setProperty("group.id", "4");        prop.put("enable.auto.commit", "false");        prop.put("auto.offset.reset", "earliest");        prop.setProperty("bootstrap.servers", "localhost:9092");        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        prop.setProperty("max.poll.records","150");        return prop;    }    public void run() {        TopicPartition tp = new TopicPartition("my.topic", 0);        KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());        ArrayList tpList = new ArrayList<TopicPartition>();        tpList.add(tp);        consumer.assign(tpList);        ConsumerRecords poll = consumer.poll(1000);        Iterator it = poll.iterator();        consumer.commitAsync();        while(it.hasNext()){            ConsumerRecord cr = (ConsumerRecord) it.next();            System.out.println("From "+this.name+" : "+cr.value());        }        consumer.close();        System.out.println("Thread Exiting "+this.name);    }}結果From Thread1 : produced_0From Thread1 : produced_1From Thread1 : produced_2From Thread1 : produced_3...From Thread1 : produced_136From Thread2 : produced_0From Thread2 : produced_1From Thread2 : produced_2From Thread2 : produced_3...預期的 :From Thread1 : produced_0From Thread1 : produced_1From Thread1 : produced_2From Thread1 : produced_3...From Thread1 : produced_136From Thread2 : produced_4From Thread2 : produced_5From Thread2 : produced_6From Thread2 : produced_137
查看完整描述

2 回答

?
波斯汪

TA貢獻1811條經驗 獲得超4個贊

就像 Lior Chaga 在他的評論中所說的那樣,您正在手動將主題分區分配給您的消費者。這不是執行此操作的推薦方法。最重要的是,您的所有消費者似乎都在使用完全相同的 groupID。利用這種結構,有兩個線程消費,如果消費者的至少一個有一個特定的消息,沒有其他線程會得到一個。如果您希望所有的消費者線程都獲得自己的“一組”消息,而不會相互中斷,那么您需要給它們不同的group.ids。


要訂閱主題以便它為您處理自動重新平衡,然后消費,您應該執行以下操作(取自下面鏈接的 KafkaConsumer javadoc):


 consumer.subscribe(Arrays.asList("foo", "bar"));

 while (true) {

     ConsumerRecords<String, String> records = consumer.poll(100);

     for (ConsumerRecord<String, String> record : records)

         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

 }

Kafka 官方 javadocs 有更詳細的解釋:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


查看完整回答
反對 回復 2021-11-17
?
大話西游666

TA貢獻1817條經驗 獲得超14個贊

只有使用kafka 消費者的subscribe方法才能將分區自動分配給消費者組。但是,您使用assign特定主題分區,因此您承擔將特定分區分配給不同消費者的責任(但您始終使用相同的分區0,因此所有消費者都從同一主題分區消費)。


查看完整回答
反對 回復 2021-11-17
  • 2 回答
  • 0 關注
  • 319 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號