1 回答

TA貢獻1786條經驗 獲得超13個贊
為了解決這個問題,2.2 版引入了ErrorHandlingDeserializer2. 此反序列化器委托給真正的反序列化器(鍵或值)。如果委托未能反序列化記錄內容,則ErrorHandlingDeserializer2返回一個空值和一個DeserializationException包含原因和原始字節的標頭。當您使用記錄級別MessageListener時,如果 ConsumerRecord 包含DeserializationException鍵或值的標頭,則會使用失敗的 ConsumerRecord 調用容器的 ErrorHandler。記錄不會傳遞給偵聽器。
但是,由于您使用的是早期版本,因此可以使用以下 hack。
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
thrownException.printStackTrace();
if (thrownException instanceOf SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
}
添加回答
舉報