1 回答

TA貢獻1829條經驗 獲得超4個贊
我現在知道如何解決它,如果我開始在map材料化值中使用廣播中心的結果源,它就會起作用:
Publisher<String> stringPublisher = Source .from(Lists.newArrayList("Hello", "World", "!")) .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); Source .fromPublisher(stringPublisher) .alsoToMat(BroadcastHub.of(String.class, 256), Keep.right()) .mapMaterializedValue(source -> source .runWith(Sink.foreach(System.out::println, materializer)) .run(materializer) .toCompletableFuture() .get();
編輯:TL;DR:在光彎論壇上有這樣的解釋:
此處發生的情況是,當您附加另一個流時,主流流已經完成。有時,在完成之前查看一些元素可能足夠快。
---
因此,看起來廣播中心實際上在消費者附加到廣播中心創建的源之前刪除了元素。
文檔說它不會掉落:
如果沒有訂閱者連接到此集線器,則它不會丟棄任何元素,而是對上游生產者進行背壓,直到訂閱者到達。
https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html
實際上,在大多數情況下都是如此,但是我發現有些情況下它的行為不正確:
public void testBH3() throws ExecutionException, InterruptedException {
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
}
public void repeat() throws ExecutionException, InterruptedException {
for (int i = 0; i < 100; i++) {
testBH3();
System.out.println("------");
}
}
這適用于 100 個案例中的大約 3 個。但以下方法適用于所有情況(我只是添加了一個節流閥來產生較慢的元素):
public void testBH3() throws ExecutionException, InterruptedException {
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.throttle(1, Duration.ofSeconds(1))
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
}
因此,在我看來,當沒有已經連接Sink時,廣播中心有時會丟棄元素。
添加回答
舉報