亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 KafkaBootstrapConfiguration 中覆蓋 KafkaListener

如何在 KafkaBootstrapConfiguration 中覆蓋 KafkaListener

偶然的你 2023-06-21 16:20:08
我需要向 KafkaListenerEndpointRegistry 添加一些邏輯 - 我想為每個主題注冊額外的偵聽器(我想創建具有不同輪詢時間的重試主題消費者鏈),我使用 @Listener 注釋創建。為此,我想嘗試覆蓋 registerListenerContainer 方法并在那里實現邏輯。我做的第一步是添加與 KafkaBootstrapConfiguration 相同的默認配置。但在那之后,我所有的測試都失敗了,出于某種原因,我的聽眾沒有消費任何東西。如果我不添加豆,一切都會正常。@Configuration@EnableKafkapublic class CustomKafkaBootstrapConfiguration {  @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)  public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {    return new KafkaListenerEndpointRegistry(){        @Override        public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {            //i need to add logic here            super.registerListenerContainer(endpoint, factory);        }    };  }}
查看完整描述

1 回答

?
慕田峪9158850

TA貢獻1794條經驗 獲得超7個贊

我剛剛復制了您的覆蓋,一切都按預期進行。


@SpringBootApplication

public class So57674940Application {


    public static void main(String[] args) {

        SpringApplication.run(new Class<?>[] { So57674940Application.class, So57674940ApplicationConfig.class }, args);

    }


    @KafkaListener(id = "so57674940", topics = "so57674940")

    public void listen(String in) {

        System.out.println(in);

    }


}


@Configuration

@EnableKafka

class So57674940ApplicationConfig {


    @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)

    public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {

        return new KafkaListenerEndpointRegistry() {

            @Override

            public void registerListenerContainer(KafkaListenerEndpoint endpoint,

                    KafkaListenerContainerFactory<?> factory) {

                // i need to add logic here

                System.out.println("in custom registry");

                super.registerListenerContainer(endpoint, factory);

            }

        };

    }


}


in custom registry

2019-08-27 11:20:36.251  INFO 33460 --- [o57674940-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so57674940-0]



查看完整回答
反對 回復 2023-06-21
  • 1 回答
  • 0 關注
  • 188 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號