亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

何時使用 ConcurrentKafkaListenerContainerFactory?

何時使用 ConcurrentKafkaListenerContainerFactory?

臨摹微笑 2022-07-20 19:26:12
我是 kafka 的新手,我瀏覽了文檔,但我什么都不懂。有人可以解釋何時使用該ConcurrentKafkaListenerContainerFactory課程嗎?我已經使用了該Kafkaconsumer課程,但我看到ConcurrentKafkaListenerContainerFactory在我當前的項目中使用了該課程。請解釋它的用途。
查看完整描述

2 回答

?
翻閱古今

TA貢獻1780條經驗 獲得超5個贊

Kafka 消費者不是線程安全的。所有網絡 I/O 都發生在進行調用的應用程序的線程中。確保多線程訪問正確同步是用戶的責任。非同步訪問將導致ConcurrentModificationException.


如果消費者被分配了多個分區來獲取數據,它將嘗試同時從所有分區中消費,從而有效地為這些分區提供相同的消費優先級。但是,在某些情況下,消費者可能希望首先專注于從分配的分區的某個子集全速獲取,并且僅在這些分區幾乎沒有或沒有數據要消耗時才開始獲取其他分區。


春卡夫卡


ConcurrentKafkaListenerContainerFactory用于為帶注釋的方法創建容器@KafkaListener


MessageListenerContainer春天卡夫卡有兩個


KafkaMessageListenerContainer

ConcurrentMessageListenerContainer

KafkaMessageListenerContainer接收來自單個線程上所有主題或分區的所有消息。ConcurrentMessageListenerContainer委托給一個或多個實例KafkaMessageListenerContainer以提供多線程消費。


使用 ConcurrentMessageListenerContainer


@Bean

KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>

                    kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

                            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(3);

    factory.getContainerProperties().setPollTimeout(3000);

    return factory;

  }

它具有并發屬性。例如, container.setConcurrency(3) 創建了三個KafkaMessageListenerContainer實例。


如果TopicPartition提供了6個實例,并發為3;每個容器有兩個分區。對于五個 TopicPartition 實例,兩個容器獲得兩個分區,第三個獲得一個。如果并發大于 TopicPartition 的數量,則將并發調低,使每個容器獲得一個分區。


查看完整回答
反對 回復 2022-07-20
?
梵蒂岡之花

TA貢獻1900條經驗 獲得超5個贊

Kafka Consumer API 不是線程安全的。ConcurrentKafkaListenerContainerFactory api 提供了使用 Kafka Consumer API 的并發方式以及設置其他 kafka 消費者屬性。



查看完整回答
反對 回復 2022-07-20
  • 2 回答
  • 0 關注
  • 527 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號