亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 Spring Boot 從 Kafka 隊列消費時獲取序列化異常

使用 Spring Boot 從 Kafka 隊列消費時獲取序列化異常

米琪卡哇伊 2022-12-15 15:14:56
我正在使用帶有 Kafka 的 Spring Boot 來使用來自 Kafka 隊列的消息。但是,如果 Jackson 解析 payload 有任何錯誤。該消息仍然卡住,并不斷重新嘗試消費,始終給出解析異常。我嘗試在 Kafka 配置中使用 ErrorHandlingDeserializer2 并映射錯誤處理程序,但問題仍然存在。@Configuration@EnableKafkapublic class KafkaConfig {    @Value("${spring.kafka.bootstrap-servers}")    private String bootstrapServers;    @Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);        props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);        props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);        props.put(ConsumerConfig.GROUP_ID_CONFIG, "${connecto.group-id}");        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);        return props;    }    @Bean    public ConsumerFactory<String, UserDto> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(                consumerConfigs(),                new StringDeserializer(),                new JsonDeserializer<>(UserDto.class));    }    @Bean    public ConcurrentKafkaListenerContainerFactory<String, UserDto> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, UserDto> factory =                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setErrorHandler(new MosaicKafkaErrorHandler());        return factory;    }    @Bean    public KafkaTemplate kafkaTemplate()     {        return new KafkaTemplate<>(producerFactory());    }}
查看完整描述

1 回答

?
開滿天機

TA貢獻1786條經驗 獲得超13個贊

為了解決這個問題,2.2 版引入了ErrorHandlingDeserializer2. 此反序列化器委托給真正的反序列化器(鍵或值)。如果委托未能反序列化記錄內容,則ErrorHandlingDeserializer2返回一個空值和一個DeserializationException包含原因和原始字節的標頭。當您使用記錄級別MessageListener時,如果 ConsumerRecord 包含DeserializationException鍵或值的標頭,則會使用失敗的 ConsumerRecord 調用容器的 ErrorHandler。記錄不會傳遞給偵聽器。


但是,由于您使用的是早期版本,因此可以使用以下 hack。


     @Override

        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,

                MessageListenerContainer container) {

            thrownException.printStackTrace();

            if (thrownException instanceOf SerializationException){

                String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

                String topics = s.split("-")[0];

                int offset = Integer.valueOf(s.split("offset ")[1]);

                int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);


                TopicPartition topicPartition = new TopicPartition(topics, partition);

                consumer.seek(topicPartition, offset + 1);  

               }

            }


查看完整回答
反對 回復 2022-12-15
  • 1 回答
  • 0 關注
  • 265 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號