亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

當ForkJoinPool結束時

當ForkJoinPool結束時

慕俠2389804 2023-06-21 13:15:03
我試圖弄清楚所有 ForkJoinPool 線程何時完成其任務。我編寫了這個測試應用程序(我使用 System.out 因為它只是一個快速測試應用程序,并且沒有錯誤檢查/處理):public class TestForkJoinPoolEnd {    private static final Queue<String> queue = new LinkedList<>();    private static final int MAX_SIZE = 5000;    private static final int SPEED_UP = 100;    public static void main(String[] args) {        ForkJoinPool customThreadPool = new ForkJoinPool(12);        customThreadPool.submit(                () -> makeList()                        .parallelStream()                        .forEach(TestForkJoinPoolEnd::process));        enqueue("Theard pool started up");        int counter = MAX_SIZE + 1;        while (!customThreadPool.isTerminating()) {            String s = dequeue();            if (s != null) {                System.out.println(s);                counter--;            }            try {                TimeUnit.MILLISECONDS.sleep(1);            } catch (InterruptedException e) {            }        }        System.out.println("counter = " + counter);        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +                "= " + customThreadPool.isTerminating() + " isTerminated = "                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());    }    static List<String> makeList() {        return Stream.generate(() -> makeString())                .limit(MAX_SIZE)                .collect(Collectors.toList());    }    static String makeString() {        int leftLimit = 97; // letter 'a'        int rightLimit = 122; // letter 'z'        int targetStringLength = 10;        Random random = new Random();    }}這段代碼被卡住并且永遠無法完成。如果我將 while 循環的條件更改為!customThreadPool.isQuiescent()它會終止循環,計數器和隊列大小設置為 1。我應該使用什么來確定線程何時完成?
查看完整描述

1 回答

?
LEATH

TA貢獻1936條經驗 獲得超7個贊

AnExecutorService不會僅僅因為一項作業(及其子作業)完成而自行終止。線程池背后的整個想法是可重用。

因此,只有當應用程序調用shutdown()它時,它才會終止。

您可以使用它isQuiescent()來查找是否沒有待處理的作業,這僅在所有提交的作業都屬于您的特定任務時才有效。使用返回的 future 來submit檢查實際工作的完成情況要簡潔得多。

在任何一種情況下,排隊任務的完成狀態都不會說明您正在輪詢的隊列。當您了解提交結束時,您仍然需要檢查隊列中是否有未決元素。

此外,建議使用線程安全BlockingQueue實現而不是裝飾LinkedListwithsynchronized塊。加上一些需要清理的其他內容,代碼將如下所示:

public class TestForkJoinPoolEnd {

    private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();

    private static final int MAX_SIZE = 5000;

    private static final int SPEED_UP = 100;


    public static void main(String[] args) {

        ForkJoinPool customThreadPool = new ForkJoinPool(12);

        ForkJoinTask<?> future = customThreadPool.submit(

            () -> makeList()

                    .parallelStream()

                    .forEach(TestForkJoinPoolEnd::process));

        QUEUE.offer("Theard pool started up");


        int counter = MAX_SIZE + 1;

        while (!future.isDone()) try {

            String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);

            if (s != null) {

                System.out.println(s);

                counter--;

            }

        } catch (InterruptedException e) {}


        for(;;) {

            String s = QUEUE.poll();

            if (s == null) break;

            System.out.println(s);

            counter--;

        }

        System.out.println("counter = " + counter);

        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +

                "= " + customThreadPool.isTerminating() + " isTerminated = "

                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());


        customThreadPool.shutdown();

    }


    static List<String> makeList() {

        return IntStream.range(0, MAX_SIZE)

            .mapToObj(i -> makeString())

            .collect(Collectors.toList());

    }


    static String makeString() {

        int targetStringLength = 10;

        Random random = new Random();

        StringBuilder buffer = new StringBuilder(targetStringLength);

        for (int i = 0; i < targetStringLength; i++) {

            int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';

            buffer.append((char) randomLimitedInt);

        }

        return buffer.toString();

    }


    static int toSeed(String s) {

        return s.chars().sum() / SPEED_UP;

    }


    static void process(String s) {

        long start = System.nanoTime();

        try {

            TimeUnit.MILLISECONDS.sleep(toSeed(s));

        } catch (InterruptedException e) {


        }

        long end = System.nanoTime();

        QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");

    }

}

如果您sleep在接收端的呼叫應該模擬一些工作量而不是等待新項目,您也可以使用


int counter = MAX_SIZE + 1;

while (!future.isDone()) {

    String s = QUEUE.poll();

    if (s != null) {

        System.out.println(s);

        counter--;

    }

    try {

        TimeUnit.MILLISECONDS.sleep(1);

    } catch (InterruptedException e) {}

}

但邏輯沒有改變。future.isDone()返回后true,我們必須重新檢查隊列中待處理的元素。我們只保證不會有新的項目到達,而不保證隊列已經空了。


作為旁注,該makeString()方法可以進一步改進


static String makeString() {

    int targetStringLength = 10;

    ThreadLocalRandom random = ThreadLocalRandom.current();

    StringBuilder buffer = new StringBuilder(targetStringLength);

    for (int i = 0; i < targetStringLength; i++) {

        int randomLimitedInt = random.nextInt('a', 'z' + 1);

        buffer.append((char)randomLimitedInt);

    }

    return buffer.toString();

}

甚至


static String makeString() {

    int targetStringLength = 10;

    return ThreadLocalRandom.current()

        .ints(targetStringLength, 'a', 'z'+1)

        .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)

        .toString();

}


查看完整回答
反對 回復 2023-06-21
  • 1 回答
  • 0 關注
  • 137 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號