2 回答

TA貢獻1804條經驗 獲得超3個贊
是的,你是對的ParallelFluxhas not .expand()and .expandDeep()methods,但我可以使用其他方式,創建具有擴展方法的附加 Publisher 并將其傳遞給你的ParallelFlux,如下所示:
public static void main(String[] args) {
Function<Node, Flux<Node>> expander =
node -> Flux.fromIterable(node.children);
List<Node> roots = createTestNodes();
Flux.fromIterable(roots)
.parallel(4)
.runOn(Schedulers.parallel())
.flatMap(node -> Flux.just(node).expandDeep(expander))
.doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))
.sequential()
.subscribe();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished");
}
我的測試數據:
static final class Node {
final String name;
final List<Node> children;
Node(String name, Node... nodes) {
this.name = name;
this.children = new ArrayList<>();
children.addAll(Arrays.asList(nodes));
}
@Override
public String toString() {
return name;
}
}
static List<Node> createTestNodes() {
return new Node("root",
new Node("1",
new Node("11")
),
new Node("2",
new Node("21"),
new Node("22",
new Node("221")
)
),
new Node("3",
new Node("31"),
new Node("32",
new Node("321")
),
new Node("33",
new Node("331"),
new Node("332",
new Node("3321")
)
)
),
new Node("4",
new Node("41"),
new Node("42",
new Node("421")
),
new Node("43",
new Node("431"),
new Node("432",
new Node("4321")
)
),
new Node("44",
new Node("441"),
new Node("442",
new Node("4421")
),
new Node("443",
new Node("4431"),
new Node("4432")
)
)
)
).children;
}
結果:
Time: 1549296674522 thread: parallel-4 value: 4
Time: 1549296674523 thread: parallel-4 value: 41
Time: 1549296674523 thread: parallel-2 value: 2
Time: 1549296674523 thread: parallel-2 value: 21
Time: 1549296674523 thread: parallel-3 value: 3
Time: 1549296674523 thread: parallel-3 value: 31
Time: 1549296674523 thread: parallel-1 value: 1
Time: 1549296674523 thread: parallel-1 value: 11
Time: 1549296674525 thread: parallel-2 value: 22
Time: 1549296674525 thread: parallel-2 value: 221
Time: 1549296674526 thread: parallel-3 value: 32
Time: 1549296674526 thread: parallel-3 value: 321
Time: 1549296674526 thread: parallel-3 value: 33
Time: 1549296674526 thread: parallel-3 value: 331
Time: 1549296674526 thread: parallel-3 value: 332
Time: 1549296674526 thread: parallel-3 value: 3321
Time: 1549296674526 thread: parallel-4 value: 42
Time: 1549296674526 thread: parallel-4 value: 421
Time: 1549296674526 thread: parallel-4 value: 43
Time: 1549296674526 thread: parallel-4 value: 431
Time: 1549296674526 thread: parallel-4 value: 432
Time: 1549296674526 thread: parallel-4 value: 4321
Time: 1549296674527 thread: parallel-4 value: 44
Time: 1549296674527 thread: parallel-4 value: 441
Time: 1549296674527 thread: parallel-4 value: 442
Time: 1549296674527 thread: parallel-4 value: 4421
Time: 1549296674528 thread: parallel-4 value: 443
Time: 1549296674528 thread: parallel-4 value: 4431
Time: 1549296674528 thread: parallel-4 value: 4432
如您所見expander,在并行線程中工作。

TA貢獻1993條經驗 獲得超6個贊
這是一個示例,基于YauhenBalykin給出的示例:
public static void main(String[] args) {
Function<Node, Flux<Node>> expander =
node -> Flux.fromIterable(node.children)
.subscribeOn(Schedulers.parallel());
List<Node> roots = createTestNodes();
Flux.fromIterable(roots)
.expand(expander)
.doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))
.subscribe();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished");
}
測試數據:
static final class Node {
final String name;
final List<Node> children;
Node(String name, Node... nodes) {
this.name = name;
this.children = new ArrayList<>();
children.addAll(Arrays.asList(nodes));
}
@Override
public String toString() {
return name;
}
}
static List<Node> createTestNodes() {
return new Node("root",
new Node("1",
new Node("11")
),
new Node("2",
new Node("21"),
new Node("22",
new Node("221")
)
),
new Node("3",
new Node("31"),
new Node("32",
new Node("321")
),
new Node("33",
new Node("331"),
new Node("332",
new Node("3321")
)
)
),
new Node("4",
new Node("41"),
new Node("42",
new Node("421")
),
new Node("43",
new Node("431"),
new Node("432",
new Node("4321")
)
),
new Node("44",
new Node("441"),
new Node("442",
new Node("4421")
),
new Node("443",
new Node("4431"),
new Node("4432")
)
)
)
).children;
}
結果:
Time: 1636182895717 thread: main value: 1
Time: 1636182895754 thread: main value: 2
Time: 1636182895754 thread: main value: 3
Time: 1636182895754 thread: main value: 4
Time: 1636182895761 thread: parallel-1 value: 11
Time: 1636182895761 thread: parallel-2 value: 21
Time: 1636182895761 thread: parallel-2 value: 22
Time: 1636182895762 thread: parallel-3 value: 31
Time: 1636182895762 thread: parallel-3 value: 32
Time: 1636182895762 thread: parallel-3 value: 33
Time: 1636182895762 thread: parallel-4 value: 41
Time: 1636182895762 thread: parallel-4 value: 42
Time: 1636182895762 thread: parallel-4 value: 43
Time: 1636182895762 thread: parallel-4 value: 44
Time: 1636182895764 thread: parallel-7 value: 221
Time: 1636182895764 thread: parallel-9 value: 321
Time: 1636182895764 thread: parallel-10 value: 331
Time: 1636182895765 thread: parallel-10 value: 332
Time: 1636182895765 thread: parallel-12 value: 421
Time: 1636182895765 thread: parallel-1 value: 431
Time: 1636182895765 thread: parallel-1 value: 432
Time: 1636182895766 thread: parallel-2 value: 441
Time: 1636182895766 thread: parallel-2 value: 442
Time: 1636182895766 thread: parallel-2 value: 443
Time: 1636182895766 thread: parallel-6 value: 3321
Time: 1636182895767 thread: parallel-9 value: 4321
Time: 1636182895767 thread: parallel-11 value: 4421
Time: 1636182895767 thread: parallel-12 value: 4431
Time: 1636182895767 thread: parallel-12 value: 4432
finished
添加回答
舉報