亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

當其中之一拋出異常時如何阻止可運行對象的執行

當其中之一拋出異常時如何阻止可運行對象的執行

千萬里不及你 2023-07-19 17:05:51
我有一組元素,對于每個元素,我都執行方法,將其作為 Runnable 傳遞給 CompletableFuture.runAsync() 。在執行過程中,可能需要停止整個計算,因此我在執行方法之前檢查一些條件。如果計算應該停止,那么我會拋出一個異常,該異常在 CompletableFuture 之外處理。我想阻止所有 Runnables 的執行,這些 Runnables 在拋出異常后執行。因此,換句話說,當其中任何一個 CompletableFuture 拋出異常時,我不想等待所有 CompletableFuture 完成。Set elements = ...Executor executor = Executors.newFixedThreadPool(N);try {    CompletableFuture.allOf(elements.stream().map(e - > CompletableFuture.runAsync(() - > {        if (shouldStop()) {            throw new MyException();        }        myMethod(e);    }, executor)).toArray(CompletableFuture[]::new)).join()} catch (CompletionException e) {    ...}
查看完整描述

3 回答

?
慕田峪9158850

TA貢獻1794條經驗 獲得超7個贊

發生異常時全部取消即可。障礙在于您在創建它們時并不了解所有這些,并且您不想多次執行此工作。這可以通過創建一個新的、空的CompletableFuture第一個(我們稱之為f1)來解決。然后,像以前一樣創建 future,但f1.cancel在if(shouldStop()) { … }語句中插入對 的調用。然后,在創建所有 future 后,將一個操作鏈接起來,將所有 future 取消f1。


取消將達到兩個目的,它將阻止尚未開始的可運行對象的執行,并且將使未來通過不allOf等待仍在進行的評估完成來返回。


由于取消 aCompletableFuture與使用 a 異常完成它沒有什么不同CancellationException,并且在出現多個異常的情況下,由 返回的 futureallOf將報告任意一個,我們可以使用自completeExceptionally定義來MyException代替,以確保報告的異常不會是次要的CancellationException。


一個獨立的例子是:


static final AtomicInteger STOP = new AtomicInteger(2);

static boolean shouldStop() {

    return STOP.getAndDecrement() <= 0;

}

static final int N = 10;

public static void main(String[] args) {

    Set<Integer> elements = IntStream.range(0, 100).boxed().collect(Collectors.toSet());

    ExecutorService executor = Executors.newFixedThreadPool(N);

    try {

        CompletableFuture<?> cancelAll = new CompletableFuture<>();

        CompletableFuture<?>[] all = elements.stream()

            .map(e ->

                CompletableFuture.runAsync(() -> {

                    System.out.println("entered "+e);

                    if(shouldStop()) {

                        RuntimeException myException = new RuntimeException("stopped");

                         // alternatively cancelAll.cancel(false);

                        cancelAll.completeExceptionally(myException);

                        throw myException;

                    }

                    System.out.println("processing "+e);

                }, executor))

            .toArray(CompletableFuture<?>[]::new);

        cancelAll.whenComplete((value,throwable) -> {

            if(throwable != null) {

                for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);

            }

        });

        CompletableFuture.allOf(all).join();

    } catch (CompletionException e) {

        e.printStackTrace();

    }

    executor.shutdown();

}

這會打印類似的東西


entered 3

entered 8

entered 4

entered 6

entered 1

entered 9

entered 0

entered 7

entered 5

entered 2

entered 10

processing 8

processing 3

java.util.concurrent.CompletionException: java.lang.RuntimeException: stopped

    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)

    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)

    at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1423)

    at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1144)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at CompletableFutureTest.lambda$main$3(CompletableFutureTest.java:34)

    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)

    at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:26)

    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)

    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.lang.RuntimeException: stopped

    at CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:25)

    ... 4 more

顯示由于并發性,一些可運行對象已經在運行,但一旦傳播取消,就不會啟動后續執行。


請注意,由于cancelAll只會在異常情況下完成或根本不會完成,cancelAll.whenComplete((value,throwable) -> { for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable); });因此您可以將鏈接操作簡化為,但這只是編碼風格是否保留冗余檢查的問題。


您還可以向處理步驟添加延遲,以確保allOf(all).join()在滿足停止條件時不會等待完成。


還可以將一個操作鏈接到返回的 future,runAsync該操作將在任何異常完成時取消所有操作,而不僅僅是顯式停止。但是,必須注意返回表示通過 安排的操作的原始未來,runAsync而不是返回的未來whenComplete。


CompletableFuture<?> cancelAll = new CompletableFuture<>();

CompletableFuture<?>[] all = elements.stream()

    .map(e -> {

        CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {

            System.out.println("entered "+e);

            if(shouldStop()) throw new RuntimeException("stopped");

            System.out.println("processing "+e);

        }, executor);

        cf.whenComplete((value,throwable) -> {

            if(throwable != null) cancelAll.completeExceptionally(throwable);

        });

        return cf;

    })

    .toArray(CompletableFuture<?>[]::new);

cancelAll.whenComplete((value,throwable) -> {

    for(CompletableFuture<?> cf: all) cf.completeExceptionally(throwable);

});

CompletableFuture.allOf(all).join();


查看完整回答
反對 回復 2023-07-19
?
青春有我

TA貢獻1784條經驗 獲得超8個贊

我對 s 沒有太多(當然沒有?。┙涷?code>CompletableFuture,但我確實有一個建議(可能有幫助?)你可以在CompletableFuture.allOf(elements.stream().maptry 塊外部聲明 lambda 嗎?這樣,在嘗試內部之前,所有期貨都不會運行。但它們仍然可以被 catch 塊訪問。在其中您可以完成cancel所有這些。



查看完整回答
反對 回復 2023-07-19
?
至尊寶的傳說

TA貢獻1789條經驗 獲得超10個贊

您應該做的主要事情是interrupt希望更快地終止所有正在運行的任務,這意味著這些任務可能需要檢查中斷,以便它們知道停止正在做的事情并更快地終止。


此外,您可以在主線程中繼續并讓它們在后臺終止,而不是等待被中斷的任務實際終止。


public static void main(String[] args) {

    List<Integer> elements = Arrays.asList(5, null, 6, 3, 4); // these elements will fail fast

    // List<Integer> elements = Arrays.asList(5, 2, 6, 3, 4); // these elements will succeed


    try {

        CountDownLatch latch = new CountDownLatch(elements.size());

        ExecutorService executor = Executors.newFixedThreadPool(elements.size());

        elements.stream().forEach(e -> {

            executor.execute(() -> {

                try {

                    doSomething(e);

                    latch.countDown();

                } catch (Exception ex) {

                    // shutdown executor ASAP on exception, read the docs for `shutdownNow()`

                    // it will interrupt all tasks in the executor

                    if (!executor.isShutdown()) {

                        executor.shutdownNow();

                    }

                    for (int i = (int) latch.getCount(); i >= 0; i--) {

                        latch.countDown();

                    }

                    // log the exception

                    ex.printStackTrace(System.out);

                }

            });

        });

        latch.await();

        if (executor.isShutdown()) {

            System.out.println("Tasks failed! Terminating remaining tasks in the background.");

        } else {

            executor.shutdown();

            System.out.println("Tasks succeeded!");

        }

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

}


public static void doSomething(Integer sleepSecs) {

    // You will want to check for `interrupted()` throughout the method you want to be able to cancel

    if (Thread.interrupted()) {

        System.out.println(Thread.currentThread().getName() + " interrupted early");

        return;

    }


    if (sleepSecs == null) {

        System.out.println(Thread.currentThread().getName() + " throwing exception ");

        throw new RuntimeException();

    }


    try {

        System.out.println(Thread.currentThread().getName() + " started interruptable sleep for " + sleepSecs + "s");

        Thread.sleep(sleepSecs * 1000);

        System.out.println(Thread.currentThread().getName() + " finished interruptable sleep" + sleepSecs + "s");

    } catch (InterruptedException e) {

        System.out.println(Thread.currentThread().getName() + " interrupted sleep!");

    }


    // ...possibly some part of the task that can't be skipped, such as cleanup


    System.out.println(Thread.currentThread().getName() + " complete!");

}


查看完整回答
反對 回復 2023-07-19
  • 3 回答
  • 0 關注
  • 171 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號