我當前的設置具有以下配置:@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> myKafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setMessageConverter(stringJsonMessageConverter()); return factory;}其中 stringJsonMessageConverter 有@Beanpublic StringJsonMessageConverter stringJsonMessageConverter() { return new StringJsonMessageConverter(objectMapper());}使用我的對象映射器@Beanpublic ObjectMapper objectMapper() { return new ObjectMapper() .registerModule(new JavaTimeModule()) .registerModule(myCustomJacksonModules()) .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(ACCEPT_SINGLE_VALUE_AS_ARRAY, true) .configure(WRITE_DATES_AS_TIMESTAMPS, false);}使用此配置,我可以發布為:...headers = new MessageHeaders(singletonMap(TOPIC, topic));Foo foo = ....Message<?> message = new GenericMessage<>(foo, headers);kafkaTemplate.send(message);并消費為:@KafkaListener(topics = "myTopic", groupId = "g1", containerFactory = "myKafkaListenerContainerFactory")public void onMessageReceived(Foo foo) { ... works with foo here}如果 Foo 是具體類,這在本例中工作正常。但是,如果 Foo 是抽象的,并且作為消息發送的是它的子類F1 extends Foo; F2 extends Foo ...... // publisherheaders = new MessageHeaders(singletonMap(TOPIC, topic));F1 f1 = ....Message<?> message = new GenericMessage<>(f1, headers);kafkaTemplate.send(message);... // Listener@KafkaListener(topics = "myTopic", groupId = "g1", containerFactory = "myKafkaListenerContainerFactory")public void onMessageReceived(Foo foo) { ... wont work}在我的消費者中將 Foo 聲明為類型會失敗,但這會起作用:@KafkaListener(topics = "myTopic", groupId = "g1", containerFactory = "myKafkaListenerContainerFactory")public void onMessageReceived(Map<String,Object> fooAsMap) { ... this works too}有沒有辦法在我發送消息時指定目標類型?
1 回答

有只小跳蛙
TA貢獻1824條經驗 獲得超8個贊
配置StringJsonMessageConverter
帶有自定義DefaultJackson2JavaTypeMapper
的地方有setTypePrecedence(TypePrecedence.TYPE_ID)
。
還要根據發送類型 id 標頭設置類型映射以映射到所需的類(僅當生產者的類與消費者的類不同時才需要)。
添加回答
舉報
0/150
提交
取消