亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

下面CompletableFuture例子中join的調用是否阻塞進程

下面CompletableFuture例子中join的調用是否阻塞進程

BIG陽 2022-12-28 16:05:58
我試圖理解 CompletableFutures 和返回已完成期貨的調用鏈,我創建了下面的示例,它模擬了對數據庫的兩次調用。第一個方法應該是用 userIds 列表給出一個可完成的未來,然后我需要調用另一個方法提供 userId 來獲取用戶(在本例中是一個字符串)??偨Y一下:1. 獲取 ID2. 獲取與這些 ID 對應的用戶列表。我創建了簡單的方法來模擬休眠線程的響應。請檢查下面的代碼public class PipelineOfTasksExample {    private Map<Long, String> db = new HashMap<>();    PipelineOfTasksExample() {        db.put(1L, "user1");        db.put(2L, "user2");        db.put(3L, "user3");        db.put(4L, "user4");    }    private CompletableFuture<List<Long>> returnUserIdsFromDb() {        try {            Thread.sleep(500);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));    }    private CompletableFuture<String> fetchById(Long id) {        CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));        try {            Thread.sleep(500);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());        return cfId;    }    public static void main(String[] args) {        PipelineOfTasksExample example = new PipelineOfTasksExample();        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()                .thenCompose(listOfIds ->                        CompletableFuture.supplyAsync(                                () -> listOfIds.parallelStream()                                        .map(id -> example.fetchById(id).join())                                        .collect(Collectors.toList()                                        )                        )                );        System.out.println(result.join());    }}我的問題是,join call ( example.fetchById(id).join()) 是否破壞了進程的非阻塞性質。如果答案是肯定的,我該如何解決這個問題?
查看完整描述

1 回答

?
桃花長相依

TA貢獻1860條經驗 獲得超8個贊

你的例子有點奇怪,因為你returnUserIdsFromDb()在任何操作甚至開始之前就減慢了主線程的速度,同樣,fetchById減慢了調用者而不是異步操作的速度,這違背了異步操作的全部目的。


此外,.thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …))您可以簡單地使用.thenApplyAsync(listOfIds -> …).


所以一個更好的例子可能是


public class PipelineOfTasksExample {

    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()

        .collect(Collectors.toMap(id -> id, id -> "user"+id));


    PipelineOfTasksExample() {}


    private static <T> T slowDown(String op, T result) {

        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));

        System.out.println(op + " -> " + result + " thread: "

            + Thread.currentThread().getName()+ ", "

            + POOL.getPoolSize() + " threads");

        return result;

    }

    private CompletableFuture<List<Long>> returnUserIdsFromDb() {

        System.out.println("trigger building the list of Ids - thread: "

            + Thread.currentThread().getName());

        return CompletableFuture.supplyAsync(

            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),

            POOL);

    }

    private CompletableFuture<String> fetchById(Long id) {

        System.out.println("trigger fetching id: " + id + " thread: "

            + Thread.currentThread().getName());

        return CompletableFuture.supplyAsync(

            () -> slowDown("fetching id: " + id , db.get(id)), POOL);

    }


    static ForkJoinPool POOL = new ForkJoinPool(2);


    public static void main(String[] args) {

        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()

            .thenApplyAsync(listOfIds ->

                listOfIds.parallelStream()

                    .map(id -> example.fetchById(id).join())

                    .collect(Collectors.toList()

                ),

                POOL

            );

        System.out.println(result.join());

    }

}

打印出類似的東西


trigger building the list of Ids - thread: main

building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads

trigger fetching id: 2 thread: ForkJoinPool-1-worker-0

trigger fetching id: 3 thread: ForkJoinPool-1-worker-1

trigger fetching id: 4 thread: ForkJoinPool-1-worker-2

fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads

fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads

fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads

trigger fetching id: 1 thread: ForkJoinPool-1-worker-3

fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads

[user1, user2, user3, user4]

乍一看,這可能是一個驚人的線程數。


答案是join()可能會阻塞線程,但是如果這種情況發生在Fork/Join池的工作線程內部,這種情況會被檢測到并啟動一個新的補償線程,以確保配置的目標并行度。


作為一種特殊情況,當使用默認的 Fork/Join 池時,實現可能會在方法內選擇新的待處理任務join(),以確保同一線程內的進度。


所以代碼總是會取得進展,join()偶爾調用也沒有錯,如果替代方案要復雜得多,但如果過度使用,就會有資源消耗過多的危險。畢竟,之所以要使用線程池,就是為了限制線程的數量。


另一種方法是盡可能使用鏈式依賴操作。


public class PipelineOfTasksExample {

    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()

        .collect(Collectors.toMap(id -> id, id -> "user"+id));


    PipelineOfTasksExample() {}


    private static <T> T slowDown(String op, T result) {

        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));

        System.out.println(op + " -> " + result + " thread: "

            + Thread.currentThread().getName()+ ", "

            + POOL.getPoolSize() + " threads");

        return result;

    }

    private CompletableFuture<List<Long>> returnUserIdsFromDb() {

        System.out.println("trigger building the list of Ids - thread: "

            + Thread.currentThread().getName());

        return CompletableFuture.supplyAsync(

            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),

            POOL);

    }

    private CompletableFuture<String> fetchById(Long id) {

        System.out.println("trigger fetching id: " + id + " thread: "

            + Thread.currentThread().getName());

        return CompletableFuture.supplyAsync(

            () -> slowDown("fetching id: " + id , db.get(id)), POOL);

    }


    static ForkJoinPool POOL = new ForkJoinPool(2);


    public static void main(String[] args) {

        PipelineOfTasksExample example = new PipelineOfTasksExample();


        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()

            .thenComposeAsync(listOfIds -> {

                List<CompletableFuture<String>> jobs = listOfIds.parallelStream()

                    .map(id -> example.fetchById(id))

                    .collect(Collectors.toList());

                return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))

                    .thenApply(_void -> jobs.stream()

                        .map(CompletableFuture::join).collect(Collectors.toList()));

                },

                POOL

            );


        System.out.println(result.join());

        System.out.println(ForkJoinPool.commonPool().getPoolSize());

    }

}

不同之處在于,首先提交所有異步作業,然后join安排調用它們的依賴操作,僅在所有作業完成后執行,因此這些join調用永遠不會阻塞。只有方法join末尾的最終調用才main可能阻塞主線程。


所以這會打印出類似的東西


trigger building the list of Ids - thread: main

building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads

trigger fetching id: 3 thread: ForkJoinPool-1-worker-1

trigger fetching id: 2 thread: ForkJoinPool-1-worker-0

trigger fetching id: 4 thread: ForkJoinPool-1-worker-1

trigger fetching id: 1 thread: ForkJoinPool-1-worker-0

fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads

fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads

fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads

fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads

[user1, user2, user3, user4]

顯示無需創建補償線程,因此線程數與配置的目標并行度相匹配。


請注意,如果實際工作是在后臺線程中而不是在fetchById方法本身中完成的,那么您現在不再需要并行流,因為沒有阻塞join()調用。對于這種情況,僅使用stream()通常會帶來更高的性能。


查看完整回答
反對 回復 2022-12-28
  • 1 回答
  • 0 關注
  • 245 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號