1 回答

TA貢獻1998條經驗 獲得超6個贊
如果在 Kafka Streams 處理期間發生任何未捕獲的異常,您的流將狀態更改為 ERROR 并停止使用發生錯誤的分區的傳入消息。您需要自己捕獲異常。重試可以通過以下兩種方式實現:1)使用 SpringRetryTemplate調用外部微服務(但請記住,您將延遲使用來自特定分區的消息),或 2)將失敗的消息推送到另一個主題以供以后重新處理(如您所建議的)
更新時間kafka-streams 2.8.0
因為,您可以使用方法自動替換kafka-streams 2.8.0失敗的流線程(由未捕獲的異常引起)。有關詳細信息,請查看Kafka Streams 特定的未捕獲異常處理程序KafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
添加回答
舉報