亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

為什么使用 Sink.as 創建的發布服務器在被廣播中心使用時無法正常工作?

為什么使用 Sink.as 創建的發布服務器在被廣播中心使用時無法正常工作?

HUX布斯 2022-09-22 15:51:42
我們有一個多組件應用程序,它在組件之間提供反應式流API。一些組件使用Akka流實現,其他組件使用例如反應器。在一個組件中,我們注意到 Streams 不處理任何消息,盡管提供的發布者提供記錄。我把問題歸結為以下情況:Publisher<String> stringPublisher = Source     .from(Lists.newArrayList("Hello", "World", "!"))     .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); Source<String, NotUsed> allMessages = Source     .fromPublisher(stringPublisher)     .toMat(BroadcastHub.of(String.class, 256), Keep.right())     .run(materializer); allMessages     .runForeach(System.out::println, materializer)     .toCompletableFuture()     .get();一個組件提供發布服務器(它需要是發布者,因為 API 使用反應式流 API,而不是 Akka 流 API)。此發布服務器是從另一個 Akka 流源創建的,并使用 轉換為發布服務器。Sink.asPublisher現在,當我們使用 BroadcastHub 從發布者開始實現流時,根本不會處理任何記錄。我對反應堆出版商也試過同樣的方法:Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);這按預期工作。不幸的是,我不能排除另一個組件從 Akka 流源創建其發布服務器的情況。有沒有人知道出了什么問題?
查看完整描述

1 回答

?
浮云間

TA貢獻1829條經驗 獲得超4個贊

我現在知道如何解決它,如果我開始在map材料化值中使用廣播中心的結果源,它就會起作用:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source
    .fromPublisher(stringPublisher)
    .alsoToMat(BroadcastHub.of(String.class, 256), Keep.right())
    .mapMaterializedValue(source -> source
         .runWith(Sink.foreach(System.out::println, materializer))
    .run(materializer)
    .toCompletableFuture()
    .get();

編輯:TL;DR:在光彎論壇上有這樣的解釋:

此處發生的情況是,當您附加另一個流時,主流流已經完成。有時,在完成之前查看一些元素可能足夠快。

---

因此,看起來廣播中心實際上在消費者附加到廣播中心創建的源之前刪除了元素。

文檔說它不會掉落:

如果沒有訂閱者連接到此集線器,則它不會丟棄任何元素,而是對上游生產者進行背壓,直到訂閱者到達。

https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html

實際上,在大多數情況下都是如此,但是我發現有些情況下它的行為不正確:

public void testBH3() throws ExecutionException, InterruptedException {

    Publisher<String> stringPublisher = Source

        .from(Lists.newArrayList("Hello", "World", "!"))

        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);


    Source<String, NotUsed> allMessages = Source

        .fromPublisher(stringPublisher)

        .toMat(BroadcastHub.of(String.class, 256), Keep.right())

        .run(materializer);


    allMessages

        .runForeach(System.out::println, materializer)

        .toCompletableFuture()

        .get();

}


public void repeat() throws ExecutionException, InterruptedException {

    for (int i = 0; i < 100; i++) {

        testBH3();

        System.out.println("------");

    }

}

這適用于 100 個案例中的大約 3 個。但以下方法適用于所有情況(我只是添加了一個節流閥來產生較慢的元素):


public void testBH3() throws ExecutionException, InterruptedException {

    Publisher<String> stringPublisher = Source

        .from(Lists.newArrayList("Hello", "World", "!"))

        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);


    Source<String, NotUsed> allMessages = Source

        .fromPublisher(stringPublisher)

        .throttle(1, Duration.ofSeconds(1))

        .toMat(BroadcastHub.of(String.class, 256), Keep.right())

        .run(materializer);


    allMessages

        .runForeach(System.out::println, materializer)

        .toCompletableFuture()

        .get();

}

因此,在我看來,當沒有已經連接Sink時,廣播中心有時會丟棄元素。


查看完整回答
反對 回復 2022-09-22
  • 1 回答
  • 0 關注
  • 120 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號