亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將數據庫和網絡調用與 RxJava2 結合

將數據庫和網絡調用與 RxJava2 結合

慕哥9229398 2021-08-19 21:31:55
我有 2 個數據源:數據庫(緩存)和 api,我需要將它們組合成一個流。我知道我可以簡單地使用 concatArray 或類似的東西,但我想實現更復雜的行為:最多可發射 2 個元素的可觀察流。它將在開始時訂閱這兩個來源。如果 api 調用足夠快(<~300 毫秒),它將僅從中發出數據并完成流。如果 api 調用會很慢(>~300ms),從數據庫發出數據并仍然等待來自 api 的數據如果 api 調用不會成功,則從數據庫發出數據并發出錯誤。如果數據庫以某種方式比 api 慢,它就不能發出它的數據(流完成解決了這個問題)我用以下代碼完成了它:   public Observable<Entity> getEntity() {    final CompositeDisposable disposables = new CompositeDisposable();    return Observable.<Entity>create(emitter -> {        final Entity[] localEntity = new Entity[1];        //database call:        disposables.add(database.getEntity()                .subscribeOn(schedulers.io())                .doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because                                                         //apiService can emit error before 300 ms                 .delay(300, MILLISECONDS)                .subscribe((entity, throwable) -> {                    if (entity != null && !emitter.isDisposed()) {                        emitter.onNext(entity);                    }                }));        //network call:        disposables.add(apiService.getEntity()                .subscribeOn(schedulers.io())                .onErrorResumeNext(throwable -> {                    return Single.<Entity>error(throwable) //we will delay error here                            .doOnError(throwable1 -> {                                if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity                            })                            .delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error                })                .subscribe(entity -> {                    emitter.onNext(entity);                     emitter.onComplete(); //we got entity from api, so we can complete the stream                }, emitter::onError));    })代碼有點笨重,我在 observable 中創建了 observables,我認為這是不好的。但是這樣我就可以全局訪問發射器,這使我能夠以我想要的方式控制主流(發射數據、成功、錯誤)。有沒有更好的方法來實現這一目標?我很想看一些代碼示例。謝謝!
查看完整描述

3 回答

?
慕妹3242003

TA貢獻1824條經驗 獲得超6個贊

可能是下面的代碼可以完成這項工作。根據您的要求,我假設 api 和數據庫處理Single<Entity>.


private static final Object STOP = new Object();


public static void main(String[] args) {

    Database database = new Database(Single.just(new Entity("D1")));

    ApiService apiService = new ApiService(Single.just(new Entity("A1")));

    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));

    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));

    BehaviorSubject<Object> subject = BehaviorSubject.create();


    Observable.merge(

        apiService.getEntity()

                  .toObservable()

                  .doOnNext(t -> subject.onNext(STOP))

                  .doOnError(e -> subject.onNext(STOP))

                  .onErrorResumeNext(t ->

                                        Observable.concatDelayError(database.getEntity().toObservable(),

                                                                    Observable.error(t))),

        database.getEntity()

                .delay(300, MILLISECONDS)

                .toObservable()

                .takeUntil(subject)

    )

    .subscribe(System.out::println, 

               System.err::println);


    Observable.timer(1, MINUTES) // just for blocking the main thread

              .toBlocking()

              .subscribe();

}

我沒能取出使用的Subject從因條件“如果數據庫在某種程度上會比API更慢,它不能發射其數據”和“如果API調用將是緩慢的(>?300毫秒),排出數據數據庫并仍然等待來自 api 的數據”。否則,amb()運算符將是一個很好的用途。


查看完整回答
反對 回復 2021-08-19
?
MMMHUHU

TA貢獻1834條經驗 獲得超8個贊

另一種解決方案可能是這個(沒有主題):


public static void main(String[] args) throws InterruptedException {

    Database database = new Database(Single.just(new Entity("D1")));

    ApiService apiService = new ApiService(Single.just(new Entity("A1")));

    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));

    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));


    database.getEntity()

            .toObservable()

            .groupJoin(apiService.getEntity()

                                 .toObservable()

                                 .onErrorResumeNext(

                                    err -> Observable.concatDelayError(database.getEntity().toObservable(),

                                                                       Observable.error(err))),

                       dbDuration -> Observable.timer(300, MILLISECONDS),

                       apiDuration -> Observable.never(),

                       (db, api) -> api.switchIfEmpty(Observable.just(db)))

            .flatMap(o -> o)

            .subscribe(System.out::println,

                       Throwable::printStackTrace,

                       () -> System.out.println("It's the end!"));


    Observable.timer(1, MINUTES) // just for blocking the main thread

              .toBlocking()

              .subscribe();

}

如果 API 服務在 300 毫秒 ( dbDuration -> timer(300, MILLISECONDS))內沒有發出任何內容,則從數據庫中發出實體 ( api.switchIfEmpty(db))。


如果 api 在 300 毫秒內發出某些內容,則 僅發出其Entity( api.switchIfEmpty(.))。


這似乎也如您所愿...


查看完整回答
反對 回復 2021-08-19
?
呼啦一陣風

TA貢獻1802條經驗 獲得超6個贊

另一個更好的解決方案:


public static void main(String[] args) throws InterruptedException {

    Database database = new Database(Single.just(new Entity("D1")));

    ApiService apiService = new ApiService(Single.just(new Entity("A1")));

    // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));

    // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));


    Observable<Entity> apiServiceWithDbAsBackup =

            apiService.getEntity()

                      .toObservable()

                      .onErrorResumeNext(err -> 

                            Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));


    Observable.amb(database.getEntity()

                           .toObservable()

                           .delay(300, MILLISECONDS)

                           .concatWith(apiServiceWithDbAsBackup),

                   apiServiceWithDbAsBackup)

              .subscribe(System.out::println,

                         Throwable::printStackTrace,

                         () -> System.out.println("It's the end!"));

我們使用amb()延遲到數據庫 observable 來獲取將發出的第一個。如果 api 服務出錯,我們會從數據庫中發出項目。這似乎也如您所愿...



查看完整回答
反對 回復 2021-08-19
  • 3 回答
  • 0 關注
  • 234 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號