3 回答

TA貢獻1824條經驗 獲得超6個贊
可能是下面的代碼可以完成這項工作。根據您的要求,我假設 api 和數據庫處理Single<Entity>.
private static final Object STOP = new Object();
public static void main(String[] args) {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
BehaviorSubject<Object> subject = BehaviorSubject.create();
Observable.merge(
apiService.getEntity()
.toObservable()
.doOnNext(t -> subject.onNext(STOP))
.doOnError(e -> subject.onNext(STOP))
.onErrorResumeNext(t ->
Observable.concatDelayError(database.getEntity().toObservable(),
Observable.error(t))),
database.getEntity()
.delay(300, MILLISECONDS)
.toObservable()
.takeUntil(subject)
)
.subscribe(System.out::println,
System.err::println);
Observable.timer(1, MINUTES) // just for blocking the main thread
.toBlocking()
.subscribe();
}
我沒能取出使用的Subject從因條件“如果數據庫在某種程度上會比API更慢,它不能發射其數據”和“如果API調用將是緩慢的(>?300毫秒),排出數據數據庫并仍然等待來自 api 的數據”。否則,amb()運算符將是一個很好的用途。

TA貢獻1834條經驗 獲得超8個贊
另一種解決方案可能是這個(沒有主題):
public static void main(String[] args) throws InterruptedException {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
database.getEntity()
.toObservable()
.groupJoin(apiService.getEntity()
.toObservable()
.onErrorResumeNext(
err -> Observable.concatDelayError(database.getEntity().toObservable(),
Observable.error(err))),
dbDuration -> Observable.timer(300, MILLISECONDS),
apiDuration -> Observable.never(),
(db, api) -> api.switchIfEmpty(Observable.just(db)))
.flatMap(o -> o)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("It's the end!"));
Observable.timer(1, MINUTES) // just for blocking the main thread
.toBlocking()
.subscribe();
}
如果 API 服務在 300 毫秒 ( dbDuration -> timer(300, MILLISECONDS))內沒有發出任何內容,則從數據庫中發出實體 ( api.switchIfEmpty(db))。
如果 api 在 300 毫秒內發出某些內容,則 僅發出其Entity( api.switchIfEmpty(.))。
這似乎也如您所愿...

TA貢獻1802條經驗 獲得超6個贊
另一個更好的解決方案:
public static void main(String[] args) throws InterruptedException {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
Observable<Entity> apiServiceWithDbAsBackup =
apiService.getEntity()
.toObservable()
.onErrorResumeNext(err ->
Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));
Observable.amb(database.getEntity()
.toObservable()
.delay(300, MILLISECONDS)
.concatWith(apiServiceWithDbAsBackup),
apiServiceWithDbAsBackup)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("It's the end!"));
我們使用amb()延遲到數據庫 observable 來獲取將發出的第一個。如果 api 服務出錯,我們會從數據庫中發出項目。這似乎也如您所愿...
添加回答
舉報