亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何將輪詢 api 轉換為反應流

如何將輪詢 api 轉換為反應流

慕哥6287543 2023-03-31 09:34:32
假設我有一個具有以下簽名的函數:class Item {  String name;  Long id;}public Flux<Item> getNew(long id);getNew()返回在 id (0..N) 之后添加的項目流。那么如何將其變成無限流呢?所以像這樣:public Flux<Item> observe(long id) {    return Flux.interval(Duration.ofSeconds(1)).             flatMap(counter -> getNew(id)); // <-- how to use last value from getNew flux as the new id                }我能夠做到的唯一方法是使用某種類型的狀態變量:   public Flux<Long> observe(long id) {     final AtomicLong last = new AtomicLong(id);     return Flux.interval(Duration.ofSeconds(1)).         flatMap(l -> getNew(last.get())).         doOnNext(last::set);       }    有沒有更慣用的方法來做到這一點?我試圖為此創建生成器,但我不知道如何實現它。
查看完整描述

1 回答

?
富國滬深

TA貢獻1790條經驗 獲得超9個贊

如果您可以通過檢查識別出最后Item發出的信號,則可以使用運算符:getNew.expand

? ?public Flux<Item> observe(long id) {

? ? ? ? return getNew(id)

? ? ? ? ? ? ? ? .expand(item -> isLast(item)

? ? ? ? ? ? ? ? ? ? ? ? ? getNew(item.id)

? ? ? ? ? ? ? ? ? ? ? ? : Flux.empty());

? ? }

? ? /**

? ? ?* @return true if the given item is the last item emitted by getNew

? ? ?*/

? ? private boolean isLast(Item item) {

? ? ? ? return // ... snip ...

? ? }

如果您不能通過檢查來識別最后一個Item,那么您將不得不使用狀態變量。雖然,我建議使用.deferand.repeat而不是?.interval...


?public Flux<Item> observe(long id) {

? ? ? ? final AtomicLong nextStartId = new AtomicLong(id);

? ? ? ? return Flux.defer(() -> getNew(nextStartId.get()))

? ? ? ? ? ? ? ? .doOnNext(item -> nextStartId.set(item.id))

? ? ? ? ? ? ? ? .repeat();

? ? }

反對使用的主要原因.interval是:

如果沒有及時產生需求,將發出 onError 信號

因此,如果 API 花費的時間太長,或者處理結果的時間太長,流將以錯誤結束。對于較長的間隔,這可能不是問題,但對于相對較短的間隔(例如您的示例中的 1 秒),這可能是一個問題。

如果你想在每次重復迭代之前延遲,那么你可以使用.repeatWhen, 帶有 reactor-extra 的Repeat固定退避。這將為您提供“固定延遲”語義,而不是“固定間隔”。


查看完整回答
反對 回復 2023-03-31
  • 1 回答
  • 0 關注
  • 136 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號