亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

僅在需要時才在 Reactor 的 Flux 中請求下一個

僅在需要時才在 Reactor 的 Flux 中請求下一個

肥皂起泡泡 2023-08-04 19:19:47
我有一個 API,它返回實體列表,實體數量上限為 100 個。如果有更多實體,它將返回下一頁的令牌。我想創建一個通量,它返回所有實體(所有頁面),但僅在需要時(如果有請求)返回。我寫了這段代碼:class Page {    String token;    List<Object> entities;}Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {    return fct.apply(token).flatMapMany(page -> {        if (page.token == null) {            // no more pages            return Flux.fromIterable(page.entities);        }        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));    });}它有效 - 幾乎如果我請求 99 個元素,則加載第一頁,并且我的通量包含 99 個元素。如果我請求 150 個元素,則會加載第一頁和第二頁,并且我的 Flux 包含 150 個元素。但是,如果我請求 100 個元素,則會加載第一頁和第二頁(并且我的 Flux 包含 100 個元素)。我的問題是,第二頁已加載,但我沒有請求第 101 個元素。當前行為:subscribe()=> Function is called to load page 1request(10)=> Received: 0-9request(89)=> Received: 10-98request(1)=> Received: 99=> Function is called to load page 2request(1)=> Received: 100預期是:頁面 2 的加載發生在最后一個請求之后(1)幾乎就像在某個地方進行了預取,但我看不到在哪里。有任何想法嗎?
查看完整描述

1 回答

?
郎朗坤

TA貢獻1921條經驗 獲得超9個贊

好的,我找到了。沒有預取。事實上,它是Flux.defer根據訂閱加載下一頁的,而不是根據請求加載的。


解決這個問題的快速(但骯臟的)測試是:


Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {

    return fct.apply(token).flatMapMany(page -> {

        if (page.token == null) {

            // no more pages

            return Flux.fromIterable(page.entities);

        }


        return Flux

                .fromIterable(page.entities)

                .concatWith(

                        // Flux.defer(() -> load(page.token, fct))

                        Flux.create(s -> {

                            DelegateSubscriber[] ref = new DelegateSubscriber[1];


                            s.onRequest(l -> {

                                if (ref[0] == null) {

                                    ref[0] = new DelegateSubscriber(s);

                                    load(page.token, fct).subscribe(ref[0]);

                                }

                                ref[0].request(l);

                            });

                        }));

    });

}


static class DelegateSubscriber extends BaseSubscriber<Object> {


    FluxSink<Object> delegate;


    public DelegateSubscriber(final FluxSink<Object> delegate) {

        this.delegate = delegate;

    }


    @Override

    protected void hookOnSubscribe(Subscription subscription) {

        // nothing

    }


    @Override

    protected void hookOnNext(Object value) {

        delegate.next(value);

    }


    @Override

    protected void hookOnError(Throwable throwable) {

        delegate.error(throwable);

    }

}


查看完整回答
反對 回復 2023-08-04
  • 1 回答
  • 0 關注
  • 214 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號