使 Spring Kafka 重新加載 SSL 上下文的推薦方法是什么?我需要在不停機的情況下將新證書插入到我的 Kafka 生產者使用的信任庫中。但是我發現,一旦應用程序啟動并創建了 Kafka 生產者,就會創建并緩存 SSLContext 的實例。有一種方法可以重新配置它,但到目前為止我發現的唯一方法是通過調用 destroy 方法DefaultKafkaProducerFactory(在證書更新后)銷毀任何現有的生產者,這會導致任何后續調用KafkaTemplate.send強制創建一個新的生產者,這反過來重新加載 SSL 上下文。我覺得這就像用大錘來解決這個問題,可能會有更優雅的解決方案。我還注意到,如果在發送消息時調用destroy,我會得到以下異常,當我們無法承受丟失任何事件時,它看起來不是很積極。java.util.concurrent.CompletionException: org.apache.kafka.common.KafkaException: Producer closed while send in progress at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: org.apache.kafka.common.KafkaException: Producer closed while send in progress at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:826) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:190) at org.springframework.kafka.core.KafkaOperations$send.call(Unknown Source) at com.example.event.publisher.kafka.KafkaEventPublisher.doPublish(KafkaEventPublisher.groovy:57)
2 回答

開滿天機
TA貢獻1786條經驗 獲得超13個贊
Spring Kafka 2.6.5、Kafka 2.4.1,我改用 KafkaProducerFactory.reset()。
@Autowired
private final KafkaTemplate<String, byte[]> kafkaTemplate;
private void reloadProducer() {
kafkaTemplate.getProducerFactory().reset();
}
下次調用 send() 時,Spring 將使用新證書重新創建一個新的 KafkaConsumer 品牌。
添加回答
舉報
0/150
提交
取消