亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

我可以為 subscribeOn 方法和異步任務使用相同的執行程序嗎

我可以為 subscribeOn 方法和異步任務使用相同的執行程序嗎

慕斯709654 2021-08-13 15:53:59
我有一個簡單的問題,假設我有一個如下所示的課程:import lombok.Value;import java.nio.file.Path;@Valueclass ImageResizeRequest {    private DownloadedImage downloadedImage;    private ImageSize imageSize;    private Path destinationLocation;}上面的類代表負責將圖像調整為給定大小的單個任務。我有很多要求將此圖像調整為許多不同的尺寸。@RequiredArgsConstructorclass ImageResizeService {    private final Executor executor;    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {        return Flux.fromIterable(requests)                .flatMap(this::resize)                .collectList()                .subscribeOn(Schedulers.fromExecutor(executor));    }    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));    }    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {        return () -> {            //TODO add image resize logic for example ImageMagick by Im4Java...            /** code below call ImageMagick library             ConvertCmd cmd = new ConvertCmd();             IMOperation op = new IMOperation();             op.quality(100d);             op.addImage(request.getDestinationLocation().toString());             cmd.run(op);             */            //TODO add logic!!!            return new ImageResizeResult(null, null, null, null);        };    }}我的問題是:如何在 Project Reactor 中實現負責調整圖像大小的并行獨立任務?如果沒有項目反應器,我將使用 CompletableFuture 列表:private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {    CompletableFuture<Void> allDoneFuture =        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));    return allDoneFuture.thenApply(v ->            futures.stream().                    map(future -> future.join()).                    collect(Collectors.<T>toList())    );}具有指定的執行程序服務。此外,在我的示例中,我在 subscribeOn 方法和 supplyAsync 中使用相同的執行程序 - 是個好主意嗎?
查看完整描述

2 回答

?
FFIVE

TA貢獻1797條經驗 獲得超6個贊

不要不斷地Scheduler從重新創建 ,ExecutorService而是努力將它直接包裝在構造函數中。


您根本不需要CompletableFuture,并且subscribeOn應該應用于內部flatMap以可能為每個調整大小任務選擇單獨的線程(它從每個 Flux 應用到的池中選擇一個線程):


class ImageResizeService {


  private final Executor executor; //TODO prefer an ExecutorService if possible

  private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)


  Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

    //we get the requests on IO thread

    return Flux.fromIterable(requests)

            //for each request, perform asynchronous resize...

            .flatMap(r -> Mono

                //... by converting the resizeTask Callable to a Mono

                .fromCallable(r -> resizeTask(r).get())

                //... and making sure it executes on the executor

                .subscribeOn(scheduler)

            )

            .collectList();

  }

}

為了實現真正的并行化,您還有另一種選擇parallel().runOn()::


Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

    //we get the requests on IO thread

    return Flux.fromIterable(requests)

            //divide into N workloads

            //the executor _should_ be capable of this degree of parallelisation:

            .parallel(NUMBER_OF_DESIRED_THREADS)

            //actually tell to run each workload on a thread picked from executor

            .runOn(scheduler) 

            //here the workload are already running on their dedicated thread,

            //we can afford to block it and thus apply resize in a simpler `map`

            .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed

            //go back to a `Flux` sequence for collection into list

            .sequential()

            .collectList();

}


查看完整回答
反對 回復 2021-08-13
  • 2 回答
  • 0 關注
  • 253 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號