我正在使用 Spring Kafka 2.2.7,我已經配置@EnableKafka并kafkaListenerContainerFactory使用它@KafkaListener來消費消息,一切都按預期工作。我想添加一個RecordInterceptor來記錄所有消耗的消息,但發現很難配置它。文檔指出 RecordInterceptor 可以在容器上設置,但是我不確定如何獲取容器的實例。從2.2.7版本開始,可以向監聽器容器添加RecordInterceptor;它將在調用偵聽器之前調用,以允許檢查或修改記錄。 @Bean public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(createConsumerFactory()); factory.setConcurrency(consumerCount); return factory; }我瀏覽了 Spring 文檔,但沒有找到解決方案,這似乎是一件簡單的事情,但也許我錯過了一些東西。在這方面的任何幫助將不勝感激。提前致謝。
如何將 RecordInterceptor 設置為 ConcurrentKafkaListener
慕碼人8056858
2024-01-05 15:18:59