1 回答
TA貢獻1865條經驗 獲得超7個贊
看起來您無法在回調線程上執行生產者操作kafka-producer-network-thread- 可能是生產者代碼中的一些死鎖 - 等待獲取將使用同一線程的元數據,因此它超時。
您可能需要第二個KafkaTemaplate(和生產者工廠,因為默認工廠總是返回相同的生產者)。
或者只是在不同的線程上執行第二次發送......
@SpringBootApplication
public class So54492871Application {
private static final ExecutorService exec = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
SpringApplication.run(So54492871Application.class, args);
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54492871-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54492871-2", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");
future.addCallback(result -> {
System.out.println(Thread.currentThread().getName() + ":" + result);
exec.execute(() -> {
ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");
future2.addCallback(result2 -> {
System.out.println(Thread.currentThread().getName() + ":" + result2);
}, ex -> {
System.out.println(ex.getMessage());
});
});
}, ex -> {
System.out.println(ex.getMessage());
});
System.in.read();
};
}
}
添加回答
舉報
