2 回答

TA貢獻1797條經驗 獲得超6個贊
不要不斷地Scheduler從重新創建 ,ExecutorService而是努力將它直接包裝在構造函數中。
您根本不需要CompletableFuture,并且subscribeOn應該應用于內部flatMap以可能為每個調整大小任務選擇單獨的線程(它從每個 Flux 應用到的池中選擇一個線程):
class ImageResizeService {
private final Executor executor; //TODO prefer an ExecutorService if possible
private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
//we get the requests on IO thread
return Flux.fromIterable(requests)
//for each request, perform asynchronous resize...
.flatMap(r -> Mono
//... by converting the resizeTask Callable to a Mono
.fromCallable(r -> resizeTask(r).get())
//... and making sure it executes on the executor
.subscribeOn(scheduler)
)
.collectList();
}
}
為了實現真正的并行化,您還有另一種選擇parallel().runOn()::
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
//we get the requests on IO thread
return Flux.fromIterable(requests)
//divide into N workloads
//the executor _should_ be capable of this degree of parallelisation:
.parallel(NUMBER_OF_DESIRED_THREADS)
//actually tell to run each workload on a thread picked from executor
.runOn(scheduler)
//here the workload are already running on their dedicated thread,
//we can afford to block it and thus apply resize in a simpler `map`
.map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
//go back to a `Flux` sequence for collection into list
.sequential()
.collectList();
}
添加回答
舉報