1 回答

TA貢獻1790條經驗 獲得超9個贊
如果您可以通過檢查識別出最后Item
發出的信號,則可以使用運算符:getNew
.expand
? ?public Flux<Item> observe(long id) {
? ? ? ? return getNew(id)
? ? ? ? ? ? ? ? .expand(item -> isLast(item)
? ? ? ? ? ? ? ? ? ? ? ? ? getNew(item.id)
? ? ? ? ? ? ? ? ? ? ? ? : Flux.empty());
? ? }
? ? /**
? ? ?* @return true if the given item is the last item emitted by getNew
? ? ?*/
? ? private boolean isLast(Item item) {
? ? ? ? return // ... snip ...
? ? }
如果您不能通過檢查來識別最后一個Item
,那么您將不得不使用狀態變量。雖然,我建議使用.defer
and.repeat
而不是?.interval
...
?public Flux<Item> observe(long id) {
? ? ? ? final AtomicLong nextStartId = new AtomicLong(id);
? ? ? ? return Flux.defer(() -> getNew(nextStartId.get()))
? ? ? ? ? ? ? ? .doOnNext(item -> nextStartId.set(item.id))
? ? ? ? ? ? ? ? .repeat();
? ? }
反對使用的主要原因.interval
是:
如果沒有及時產生需求,將發出 onError 信號
因此,如果 API 花費的時間太長,或者處理結果的時間太長,流將以錯誤結束。對于較長的間隔,這可能不是問題,但對于相對較短的間隔(例如您的示例中的 1 秒),這可能是一個問題。
如果你想在每次重復迭代之前延遲,那么你可以使用.repeatWhen
, 帶有 reactor-extra 的Repeat
固定退避。這將為您提供“固定延遲”語義,而不是“固定間隔”。
添加回答
舉報