亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何使用 Avro 在 Spring Kafka 1.3 中移過不可反序列化的消息

如何使用 Avro 在 Spring Kafka 1.3 中移過不可反序列化的消息

絕地無雙 2022-06-30 17:41:39
我無法升級到 Spring 5,所以我堅持使用 spring-kafka 1.3 及其有限的錯誤處理。所以我無法訪問 spring-kafka 2 中的 ConsumerAwareErrorHandler 或 SeekToCurrent 錯誤處理程序。我正在使用@KafkaListener-annotated 方法來收聽一個主題,我已將其配置io.confluent.kafka.serializers.KafkaAvroDeserializer為我的值反序列化器。問題是,如果我在主題中得到一條不是 Avro 格式的消息,那么 KafkaMessageListenerContainer 輪詢循環就會卡住。反序列化程序會在消息上引發異常,并且輪詢循環永遠不會越過它,因此下次通過循環時,它會嘗試反序列化相同的消息并繼續循環,每秒將相同的錯誤轉儲到我的日志中數千次。似乎沒有辦法獲得 NeverRetryPolicy 或邊緣方面的任何東西,但我可以factory.getContainerProperties().setErrorHandler()。不幸的是,我不確定我能從那里做什么。有什么東西可以自動連接到我的錯誤處理程序中,我可以用它來尋找錯誤時的前向 1 個偏移量嗎?不確定那是什么,文檔并沒有過多地談論您可以使用 ErrorHandler 實際做什么,我能找到的大多數示例都是針對 spring-kafka 2.X 的。就像,它不會反序列化,我對消息無能為力,它永遠不會起作用,我想避免再次重試它,而且似乎大多數 Stackoverflow 問題都是關于做相反的事情。我還看到有些人只是將 Avro 的 Deserializer 包裝在他們自己的類中,該類會吃掉異常并返回 null。這是一個更好的計劃嗎?
查看完整描述

1 回答

?
幕布斯7119047

TA貢獻1794條經驗 獲得超8個贊

問題是反序列化在 Spring 獲取數據之前就失敗了——問題出在 Kafka 本身。

在 2.2 中,我們添加了ErrorHandlingDeserializer2包裝真正的反序列化器并向偵聽器容器發送信號,以便可以將錯誤發送到錯誤處理程序。

在舊版本中,您需要編寫自己的反序列化器包裝器 - 但是,容器中沒有代碼來處理這種情況,因此您的 catch 塊需要返回一個真實的對象,該對象是向您的偵聽器發出反序列化失敗的信號.

假設您的聽眾收到Invoice. 您的 catch 塊可以創建一個子類,BadInvoice然后說您可以在偵聽器中檢測并丟棄它。

我還看到有些人只是將 Avro 的 Deserializer 包裝在他們自己的類中,該類會吃掉異常并返回 null。這是一個更好的計劃嗎?

如果您從未獲得真正的空記錄,您可以返回null,但您必須添加@Payload(required = false)到方法參數。


查看完整回答
反對 回復 2022-06-30
  • 1 回答
  • 0 關注
  • 134 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號