1 回答

TA貢獻1796條經驗 獲得超4個贊
這個人為的例子可能會更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
這將給出類似的東西:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,換句話說,您的反應式存儲庫沒有在同一個調度程序上發布,這就是您看到您所做的行為的原因?!癠p until the next occurrence of publishOn()”并不意味著下次您的代碼調用publishOn()- 它也可以在您的任何調用中的任何發布者中flatMap(),您將無法控制。
添加回答
舉報