亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Reactor:擴展一個 ParallelFlux

Reactor:擴展一個 ParallelFlux

搖曳的薔薇 2022-06-04 10:58:34
我有一組需要擴展的項目,所以我選擇 reactor 是因為它的反應能力,因為擴展需要 IO 操作。這是一段工作代碼:public Flux<Item> expand(List<Item> unprocessedItems) {  return Flux.fromIterable(unprocessedItems)    .expandDeep(this::expandItem);}請注意,這this::expandItem是一個阻塞操作(多個數據庫查詢,一些計算,...)?,F在我希望這個擴展是平行的,但據我所知.expand(),.expandDeep()并且只是班級的成員,Flux而不是ParallelFlux班級的成員。我嘗試在通話之前添加.publishOn()and ,但沒有運氣。.subscribeOn().expand()這是我第一次使用反應器,但我沒有看到任何阻止并行擴展的技術問題,有什么辦法嗎?API是否丟失或我錯過了什么?
查看完整描述

2 回答

?
狐的傳說

TA貢獻1804條經驗 獲得超3個贊

是的,你是對的ParallelFluxhas not .expand()and .expandDeep()methods,但我可以使用其他方式,創建具有擴展方法的附加 Publisher 并將其傳遞給你的ParallelFlux,如下所示:


public static void main(String[] args) {      


    Function<Node, Flux<Node>> expander =

        node -> Flux.fromIterable(node.children);


    List<Node> roots = createTestNodes();


    Flux.fromIterable(roots)

        .parallel(4)

        .runOn(Schedulers.parallel())

        .flatMap(node -> Flux.just(node).expandDeep(expander))

        .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))

        .sequential()

        .subscribe();


    try {

        Thread.sleep(500);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

    System.out.println("finished");


}

我的測試數據:


static final class Node {

    final String name;

    final List<Node> children;


    Node(String name, Node... nodes) {

        this.name = name;

        this.children = new ArrayList<>();

        children.addAll(Arrays.asList(nodes));

    }


    @Override

    public String toString() {

        return name;

    }

}


static List<Node> createTestNodes() {

    return new Node("root",

        new Node("1",

            new Node("11")

        ),

        new Node("2",

            new Node("21"),

            new Node("22",

                new Node("221")

            )

        ),

        new Node("3",

            new Node("31"),

            new Node("32",

                new Node("321")

            ),

            new Node("33",

                new Node("331"),

                new Node("332",

                    new Node("3321")

                )

            )

        ),

        new Node("4",

            new Node("41"),

            new Node("42",

                new Node("421")

            ),

            new Node("43",

                new Node("431"),

                new Node("432",

                    new Node("4321")

                )

            ),

            new Node("44",

                new Node("441"),

                new Node("442",

                    new Node("4421")

                ),

                new Node("443",

                    new Node("4431"),

                    new Node("4432")

                )

            )

        )

    ).children;

}

結果:


Time: 1549296674522 thread: parallel-4 value: 4

Time: 1549296674523 thread: parallel-4 value: 41

Time: 1549296674523 thread: parallel-2 value: 2

Time: 1549296674523 thread: parallel-2 value: 21

Time: 1549296674523 thread: parallel-3 value: 3

Time: 1549296674523 thread: parallel-3 value: 31

Time: 1549296674523 thread: parallel-1 value: 1

Time: 1549296674523 thread: parallel-1 value: 11

Time: 1549296674525 thread: parallel-2 value: 22

Time: 1549296674525 thread: parallel-2 value: 221

Time: 1549296674526 thread: parallel-3 value: 32

Time: 1549296674526 thread: parallel-3 value: 321

Time: 1549296674526 thread: parallel-3 value: 33

Time: 1549296674526 thread: parallel-3 value: 331

Time: 1549296674526 thread: parallel-3 value: 332

Time: 1549296674526 thread: parallel-3 value: 3321

Time: 1549296674526 thread: parallel-4 value: 42

Time: 1549296674526 thread: parallel-4 value: 421

Time: 1549296674526 thread: parallel-4 value: 43

Time: 1549296674526 thread: parallel-4 value: 431

Time: 1549296674526 thread: parallel-4 value: 432

Time: 1549296674526 thread: parallel-4 value: 4321

Time: 1549296674527 thread: parallel-4 value: 44

Time: 1549296674527 thread: parallel-4 value: 441

Time: 1549296674527 thread: parallel-4 value: 442

Time: 1549296674527 thread: parallel-4 value: 4421

Time: 1549296674528 thread: parallel-4 value: 443

Time: 1549296674528 thread: parallel-4 value: 4431

Time: 1549296674528 thread: parallel-4 value: 4432

如您所見expander,在并行線程中工作。


查看完整回答
反對 回復 2022-06-04
?
ibeautiful

TA貢獻1993條經驗 獲得超6個贊

這是一個示例,基于YauhenBalykin給出的示例:


public static void main(String[] args) {


    Function<Node, Flux<Node>> expander =

            node -> Flux.fromIterable(node.children)

            .subscribeOn(Schedulers.parallel());


    List<Node> roots = createTestNodes();


    Flux.fromIterable(roots)

            .expand(expander)

            .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))

            .subscribe();


    try {

        Thread.sleep(500);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

    System.out.println("finished");


}

測試數據:


static final class Node {

    final String name;

    final List<Node> children;


    Node(String name, Node... nodes) {

        this.name = name;

        this.children = new ArrayList<>();

        children.addAll(Arrays.asList(nodes));

    }


    @Override

    public String toString() {

        return name;

    }

}


static List<Node> createTestNodes() {

    return new Node("root",

            new Node("1",

                    new Node("11")

            ),

            new Node("2",

                    new Node("21"),

                    new Node("22",

                            new Node("221")

                    )

            ),

            new Node("3",

                    new Node("31"),

                    new Node("32",

                            new Node("321")

                    ),

                    new Node("33",

                            new Node("331"),

                            new Node("332",

                                    new Node("3321")

                            )

                    )

            ),

            new Node("4",

                    new Node("41"),

                    new Node("42",

                            new Node("421")

                    ),

                    new Node("43",

                            new Node("431"),

                            new Node("432",

                                    new Node("4321")

                            )

                    ),

                    new Node("44",

                            new Node("441"),

                            new Node("442",

                                    new Node("4421")

                            ),

                            new Node("443",

                                    new Node("4431"),

                                    new Node("4432")

                            )

                    )

            )

    ).children;

}

結果:


Time: 1636182895717 thread: main value: 1

Time: 1636182895754 thread: main value: 2

Time: 1636182895754 thread: main value: 3

Time: 1636182895754 thread: main value: 4

Time: 1636182895761 thread: parallel-1 value: 11

Time: 1636182895761 thread: parallel-2 value: 21

Time: 1636182895761 thread: parallel-2 value: 22

Time: 1636182895762 thread: parallel-3 value: 31

Time: 1636182895762 thread: parallel-3 value: 32

Time: 1636182895762 thread: parallel-3 value: 33

Time: 1636182895762 thread: parallel-4 value: 41

Time: 1636182895762 thread: parallel-4 value: 42

Time: 1636182895762 thread: parallel-4 value: 43

Time: 1636182895762 thread: parallel-4 value: 44

Time: 1636182895764 thread: parallel-7 value: 221

Time: 1636182895764 thread: parallel-9 value: 321

Time: 1636182895764 thread: parallel-10 value: 331

Time: 1636182895765 thread: parallel-10 value: 332

Time: 1636182895765 thread: parallel-12 value: 421

Time: 1636182895765 thread: parallel-1 value: 431

Time: 1636182895765 thread: parallel-1 value: 432

Time: 1636182895766 thread: parallel-2 value: 441

Time: 1636182895766 thread: parallel-2 value: 442

Time: 1636182895766 thread: parallel-2 value: 443

Time: 1636182895766 thread: parallel-6 value: 3321

Time: 1636182895767 thread: parallel-9 value: 4321

Time: 1636182895767 thread: parallel-11 value: 4421

Time: 1636182895767 thread: parallel-12 value: 4431

Time: 1636182895767 thread: parallel-12 value: 4432

finished


查看完整回答
反對 回復 2022-06-04
  • 2 回答
  • 0 關注
  • 179 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號