亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Kafka-Spring 中捕獲反序列化錯誤?

如何在 Kafka-Spring 中捕獲反序列化錯誤?

白豬掌柜的 2022-11-02 10:35:54
我正在啟動一個使用 kafka 消息的應用程序。為了捕捉反序列化異常,我遵循了關于反序列化錯誤處理的Spring 文檔。我試過 failedDeserializationFunction 方法。這是我的消費者配置類@Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> consumerProps = new HashMap<>();        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);                /*  Error Handling */        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);        return consumerProps;    }    @Bean    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),                new JsonDeserializer<>(NTCMessageBody.class));    }        @Bean    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        return factory;    }這是 BiFunction 提供者public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {    @Override    public NTCMessageBody apply(byte[] t, Headers u) {        return new NTCBadMessageBody(t);    }}當我僅發送一條有關該主題的損壞消息時,我收到了此錯誤(循環中):org.apache.kafka.common.errors.SerializationException:反序列化鍵/值時出錯我知道 ErrorHandlingDeserializer2 應該委托 NTCBadMessageBody 類型并繼續消費。我還看到(在調試模式下)它從未進入 NTCBadMessageBody 類的構造函數中。
查看完整描述

4 回答

?
qq_遁去的一_1

TA貢獻1725條經驗 獲得超8個贊

錯誤處理反序列化器

當反序列化器無法反序列化消息時,Spring 無法處理該問題,因為它發生在 poll() 返回之前。為了解決這個問題,2.2 版本引入了 ErrorHandlingDeserializer。這個反序列化器委托給一個真正的反序列化器(鍵或值)。如果委托未能反序列化記錄內容,則 ErrorHandlingDeserializer 將返回 DeserializationException,其中包含原因和原始字節。使用記錄級 MessageListener 時,如果鍵或值包含 DeserializationException,則使用失敗的 ConsumerRecord 調用容器的 ErrorHandler。使用 BatchMessageListener 時,失敗的記錄與批處理中的剩余記錄一起傳遞給應用程序,因此應用程序偵聽器有責任檢查特定記錄中的鍵或值是否是 DeserializationException。

因此,根據您使用的代碼,record-level MessageListener只需添加ErrorHandlerContainer

處理異常

例如,如果您的錯誤處理程序實現了此接口,您可以相應地調整偏移量。例如,要重置偏移量以重播失敗的消息,您可以執行以下操作;但是請注意,這些都是簡單的實現,您可能需要更多地檢查錯誤處理程序。

@Bean

public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {

return (m, e, c) -> {

    this.listen3Exception = e;

    MessageHeaders headers = m.getHeaders();

    c.seek(new org.apache.kafka.common.TopicPartition(

            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),

            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),

            headers.get(KafkaHeaders.OFFSET, Long.class));

    return null;

   };

}

或者您可以像本示例中那樣進行自定義實現


@Bean

public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>

kafkaListenerContainerFactory()  {


    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {

        @Override

        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {

            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

            String topics = s.split("-")[0];

            int offset = Integer.valueOf(s.split("offset ")[1]);

            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);


            TopicPartition topicPartition = new TopicPartition(topics, partition);

            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);

            consumer.seek(topicPartition, offset + 1);

            System.out.println("OKKKKK");

        }


        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {


        }


        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {

            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

            String topics = s.split("-")[0];

            int offset = Integer.valueOf(s.split("offset ")[1]);

            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);


            TopicPartition topicPartition = new TopicPartition(topics, partition);

            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);

            consumer.seek(topicPartition, offset + 1);

            System.out.println("OKKKKK");



        }

    });



    return factory;

}


查看完整回答
反對 回復 2022-11-02
?
青春有我

TA貢獻1784條經驗 獲得超8個贊

使用 ErrorHandlingDeserializer。

當反序列化器無法反序列化消息時,Spring 無法處理該問題,因為它發生在 poll() 返回之前。為了解決這個問題,2.2 版本引入了 ErrorHandlingDeserializer。這個反序列化器委托給一個真正的反序列化器(鍵或值)。如果委托未能反序列化記錄內容,則 ErrorHandlingDeserializer 將返回 DeserializationException,其中包含原因和原始字節。使用記錄級 MessageListener 時,如果鍵或值包含 DeserializationException,則使用失敗的 ConsumerRecord 調用容器的 ErrorHandler。使用 BatchMessageListener 時,失敗的記錄與批處理中的剩余記錄一起傳遞給應用程序,

您可以使用 DefaultKafkaConsumerFactory 構造函數,該構造函數采用鍵和值 Deserializer 對象并連接到配置有適當委托的適當 ErrorHandlingDeserializer?;蛘?,您可以使用 ErrorHandlingDeserializer 使用的使用者配置屬性來實例化委托。屬性名稱為 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS;屬性值可以是類或類名

package com.mypackage.app.config;


import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.TimeoutException;


import com.mypacakage.app.model.kafka.message.KafkaEvent;


import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ListenerExecutionFailedException;

import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

import org.springframework.kafka.support.serializer.JsonDeserializer;

import org.springframework.retry.policy.SimpleRetryPolicy;

import org.springframework.retry.support.RetryTemplate;


import lombok.extern.slf4j.Slf4j;


@EnableKafka

@Configuration

@Slf4j

public class KafkaConsumerConfig {


    @Value("${kafka.bootstrap-servers}")

    private String servers;


    @Value("${listener.group-id}")

    private String groupId;


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {

    

        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());


        factory.setRetryTemplate(retryTemplate());

        factory.setErrorHandler(((exception, data) -> {

            /*

             * here you can do you custom handling, I am just logging it same as default

             * Error handler does If you just want to log. you need not configure the error

             * handler here. The default handler does it for you. Generally, you will

             * persist the failed records to DB for tracking the failed records.

             */

            log.error("Error in process with Exception {} and the record is {}", exception, data);

        }));


        return factory;


    }


    @Bean

    public ConsumerFactory<String, KafkaEvent> consumerFactory() {

        Map<String, Object> config = new HashMap<>();


        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);

        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);

        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,

                "com.mypackage.app.model.kafka.message.KafkaEvent");

        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");


        return new DefaultKafkaConsumerFactory<>(config);

    }


    private RetryTemplate retryTemplate() {

        RetryTemplate retryTemplate = new RetryTemplate();


        /*

         * here retry policy is used to set the number of attempts to retry and what

         * exceptions you wanted to try and what you don't want to retry.

         */

        retryTemplate.setRetryPolicy(retryPolicy());


        return retryTemplate;

    }


    private SimpleRetryPolicy retryPolicy() {

        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();


        // the boolean value in the map determines whether exception should be retried

        exceptionMap.put(IllegalArgumentException.class, false);

        exceptionMap.put(TimeoutException.class, true);

        exceptionMap.put(ListenerExecutionFailedException.class, true);


        return new SimpleRetryPolicy(3, exceptionMap, true);

    }

}


查看完整回答
反對 回復 2022-11-02
?
慕桂英546537

TA貢獻1848條經驗 獲得超10個贊

如果分區名稱具有“-”之類的字符,上述答案可能會出現問題。所以,我用正則表達式修改了相同的邏輯。


    import java.util.List;

    import java.util.regex.Matcher;

    import java.util.regex.Pattern;

    

    import org.apache.kafka.clients.consumer.Consumer;

    import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.common.TopicPartition;

    import org.apache.kafka.common.errors.SerializationException;

    import org.springframework.kafka.listener.ErrorHandler;

    import org.springframework.kafka.listener.MessageListenerContainer;

    

    import lombok.extern.slf4j.Slf4j;

    

    @Slf4j

    public class KafkaErrHandler implements ErrorHandler {

    

        /**

         * Method prevents serialization error freeze

         * 

         * @param e

         * @param consumer

         */

        private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {

            String p = ".*partition (.*) at offset ([0-9]*).*";

            Pattern r = Pattern.compile(p);

    

            Matcher m = r.matcher(e.getMessage());

    

            if (m.find()) {

                int idx = m.group(1).lastIndexOf("-");

                String topics = m.group(1).substring(0, idx);

                int partition = Integer.parseInt(m.group(1).substring(idx));

                int offset = Integer.parseInt(m.group(2));

    

                TopicPartition topicPartition = new TopicPartition(topics, partition);

    

                consumer.seek(topicPartition, (offset + 1));

    

                log.info("Skipped message with offset {} from partition {}", offset, partition);

            }

        }

    

        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {

            log.error("Error in process with Exception {} and the record is {}", e, record);

    

            if (e instanceof SerializationException)

                seekSerializeException(e, consumer);

        }

    

        @Override

        public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,

                MessageListenerContainer container) {

            log.error("Error in process with Exception {} and the records are {}", e, records);

    

            if (e instanceof SerializationException)

                seekSerializeException(e, consumer);

    

        }

    

        @Override

        public void handle(Exception e, ConsumerRecord<?, ?> record) {

            log.error("Error in process with Exception {} and the record is {}", e, record);

        }

    

    } 

最后在配置中使用錯誤處理程序。


 @Bean

public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {


    ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(macdStatusConsumerFactory());

    factory.setRetryTemplate(retryTemplate());

    factory.setErrorHandler(new KafkaErrHandler());


    return factory;

}

但是不推薦解析錯誤字符串來獲取分區、主題和偏移量。如果有人有更好的解決方案,請在此處發布。


查看完整回答
反對 回復 2022-11-02
?
Smart貓小萌

TA貢獻1911條經驗 獲得超7個贊

在我的工廠中,我添加了 commonErrorHander


factory.setCommonErrorHandler(new KafkaMessageErrorHandler());

并KafkaMessageErrorHandler創建如下


class KafkaMessageErrorHandler implements CommonErrorHandler {


    @Override

    public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {

        manageException(thrownException, consumer);

    }


    @Override

    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {

        manageException(thrownException, consumer);

    }


    private void manageException(Exception ex, Consumer<?, ?> consumer) {

        log.error("Error polling message: " + ex.getMessage());

        if (ex instanceof RecordDeserializationException) {

            RecordDeserializationException rde = (RecordDeserializationException) ex;

            consumer.seek(rde.topicPartition(), rde.offset() + 1L);

            consumer.commitSync();

        } else {

            log.error("Exception not handled");

        }

    }

}


查看完整回答
反對 回復 2022-11-02
  • 4 回答
  • 0 關注
  • 191 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號