如果在階段期間發生任何故障,我正在嘗試RestartFlow在 Akka Streams javadsl 中使用來重新啟動我的流程階段,但它似乎并沒有重新啟動流程,而只是丟棄了消息。我已經看到了:Akka Streams 中的 RestartFlow not working as expected,但我使用的是 2.5.19 版本,所以應該修復它?我都試過了RestartFlow.onFailuresWithBackoff,RestartFlow.withBackoff但都沒有奏效。我也嘗試過使用整個 Actor 系統主管策略,但這似乎只是攔截了異常,因此它不會從流程中拋出,而且似乎沒有提供我想要的退避和最大重試策略。流:public Consumer.DrainingControl<Done> stream() { return Consumer.committableSource(consumerSettings, Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX + ConfigKeys.CONSUMER_TOPIC))) .via(RestartFlow.onFailuresWithBackoff( Duration.ofSeconds(1), // min backoff Duration.ofSeconds(2), // max backoff, 0.2, // adds 20% "noise" to vary the intervals slightly 10, // limits the amount of restarts to 10 this::dispatchMessageFlow)) .via(Committer.flow(CommitterSettings.create(system))) .toMat(Sink.ignore(), Keep.both()) .mapMaterializedValue(Consumer::createDrainingControl) .run(mat);}我看到過一次異常,akka 表示它將由于失敗而重新啟動圖表,但在那之后就沒有別的了。根據我的理解,我應該再看 10 次。消費者繼續收聽新消息,因此看起來消息剛剛被丟棄。如果有人可以幫助我指出正確的方向,我將不勝感激。
1 回答

蕭十郎
TA貢獻1815條經驗 獲得超13個贊
它的工作方式有點不同。長話短說 - 如果發生錯誤,消息將被丟棄,但源/流將重新啟動,而不會殺死整個流。它在RestartFlow.onFailuresWithBackoff 文檔中有所描述:
重新啟動過程本質上是有損的,因為取消和發送消息之間沒有協調。來自包裝流任一端的終止信號將導致另一端終止,并且任何傳輸中的消息都將丟失。在退避期間,此 Flow 將背壓。
添加回答
舉報
0/150
提交
取消