亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在回調中發送記錄時如何在spring-kafka中修復“在xxx ms后更新元數據失敗”

在回調中發送記錄時如何在spring-kafka中修復“在xxx ms后更新元數據失敗”

幕布斯6054654 2022-06-04 14:55:23
spring-kafka 無法在回調中發送記錄    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {        @Override        public void onFailure(Throwable ex) {            log.error("log error...");        }        @Override        public void onSuccess(SendResult<String, String> result) {            kafkaTemplate.send("anotherTopic", "key", "data");        }    });當我在 onSuccess() 中調用 kafkaTemplate.send() 時,Kafka 拋出“更新元數據失敗”,這是意料之中的
查看完整描述

1 回答

?
莫回無

TA貢獻1865條經驗 獲得超7個贊

看起來您無法在回調線程上執行生產者操作kafka-producer-network-thread- 可能是生產者代碼中的一些死鎖 - 等待獲取將使用同一線程的元數據,因此它超時。


您可能需要第二個KafkaTemaplate(和生產者工廠,因為默認工廠總是返回相同的生產者)。


或者只是在不同的線程上執行第二次發送......


@SpringBootApplication

public class So54492871Application {


    private static final ExecutorService exec = Executors.newSingleThreadExecutor();


    public static void main(String[] args) {

        SpringApplication.run(So54492871Application.class, args);

    }


    @Bean

    public NewTopic topic1() {

        return new NewTopic("so54492871-1", 1, (short) 1);

    }


    @Bean

    public NewTopic topic2() {

        return new NewTopic("so54492871-2", 1, (short) 1);

    }


    @Bean

    public ApplicationRunner runner(KafkaTemplate<String, String> template) {

        return args -> {

            ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");

            future.addCallback(result -> {

                System.out.println(Thread.currentThread().getName() + ":" + result);

                exec.execute(() -> {

                    ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");

                    future2.addCallback(result2 -> {

                        System.out.println(Thread.currentThread().getName() + ":" + result2);

                    }, ex -> {

                        System.out.println(ex.getMessage());

                    });

                });

            }, ex -> {

                System.out.println(ex.getMessage());

            });

            System.in.read();

        };

    }


}


查看完整回答
反對 回復 2022-06-04
  • 1 回答
  • 0 關注
  • 129 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號