亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在 Kafka Stream 中處理消息時發生錯誤時重新處理消息

在 Kafka Stream 中處理消息時發生錯誤時重新處理消息

holdtom 2022-12-21 13:06:21
我有一個基于 Kafka Streams 的簡單 Spring 應用程序,它使用來自傳入主題的消息,進行map轉換并打印此消息。KStream像這樣配置@Beanpublic KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,         PrintAction printAction, String topicName) {    KStream<String, JsonNode> source = builder.stream(topicName,                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));    // @formatter:off    source        .map(myTransformer)        .foreach(printAction);    // @formatter:on    return source;}在內部MyTransformer,我正在調用此時可能會關閉的外部微服務。如果調用失敗(通常是拋出錯誤RuntimeException),我就無法進行轉換。這里的問題是,如果在之前的處理過程中發生任何錯誤,有什么方法可以再次重新處理 Streams 應用程序中的消息?根據我目前的研究,這里沒有辦法這樣做,我唯一的可能是將消息推送到死信主題中,并在將來再次失敗時嘗試處理它我再次將其推送到 DLT 中并以這種方式重試.
查看完整描述

1 回答

?
米琪卡哇伊

TA貢獻1998條經驗 獲得超6個贊

如果在 Kafka Streams 處理期間發生任何未捕獲的異常,您的流將狀態更改為 ERROR 并停止使用發生錯誤的分區的傳入消息。您需要自己捕獲異常。重試可以通過以下兩種方式實現:1)使用 SpringRetryTemplate調用外部微服務(但請記住,您將延遲使用來自特定分區的消息),或 2)將失敗的消息推送到另一個主題以供以后重新處理(如您所建議的)


更新時間kafka-streams 2.8.0


因為,您可以使用方法自動替換kafka-streams 2.8.0失敗的流線程(由未捕獲的異常引起)。有關詳細信息,請查看Kafka Streams 特定的未捕獲異常處理程序KafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD


kafkaStreams.setUncaughtExceptionHandler(ex -> {

    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);

    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;

});


查看完整回答
反對 回復 2022-12-21
  • 1 回答
  • 0 關注
  • 116 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號