亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何等待 JavaRx2 Flowable 完成所有任務?

如何等待 JavaRx2 Flowable 完成所有任務?

catspeake 2023-08-04 14:51:14
我正在嘗試學習 RxJava2 庫的基礎知識,現在我陷入了以下時刻:我已經生成了myFlowablevia Flowable.generate(...),現在我需要等待所有任務完成執行,然后才能繼續下一步。這是展示問題的代碼:myFlowable.parallel()            .runOn(Schedulers.computation())            .map(val -> myCollection.add(val))            .sequential()            .subscribe(val -> {                System.out.println("Thread from subscribe: " + Thread.currentThread().getName());                System.out.println("Value from subscribe: " + val.toString());            });    System.out.println("Before sleep - Number of objects: " + myCollection.size());    try {        Thread.sleep(1000);        System.out.println("After sleep - Number of objects: " + myCollection.size());    } catch (InterruptedException e) {        e.printStackTrace();    }我運行所有任務并將結果添加到集合中。如果我在 myFlowable 塊之后立即檢查集合大小,那么如果我在small之后檢查它,情況將會有所不同Thread.sleep()。有什么方法可以檢查所有任務是否已完成執行并且我們可以進一步進行?任何幫助或指導將不勝感激。
查看完整描述

3 回答

?
拉風的咖菲貓

TA貢獻1995條經驗 獲得超2個贊

使用Flowable::blockingSubscribe()- 將當前 Flowable 運行到終端事件,忽略任何值并重新拋出任何異常。

查看完整回答
反對 回復 2023-08-04
?
守著星空守著你

TA貢獻1799條經驗 獲得超8個贊

由于 RxJava 是異步的,observable 下面的 java 代碼將運行,而 observable 將在不同的線程中運行,這就是為什么如果您想在 Flowable 已完成發送數據時收到通知,您應該在 RxJava 流中執行此操作。為此,您有一個運算符 .doOnComplete 這里有一個示例如何檢測流何時完成


        Flowable.range(0, 100).parallel()

            .runOn(Schedulers.computation())

            .map(integer -> {


                return integer;

            })

            .sequential()

            .doOnComplete(() -> {

                System.out.println("finished");

            })

            .subscribe(integer -> System.out.println(integer));


查看完整回答
反對 回復 2023-08-04
?
小唯快跑啊

TA貢獻1863條經驗 獲得超2個贊

您可以使用 AtomicBoolean,將其初始化為 false 并使用 將其設置為 true doFinally()。


doFinally()在 Observable 發出 onError 或 onCompleted 信號后調用,或者被下游處理。


然后讓主線程休眠,直到completedvalue 為 true。


使用你的例子:


AtomicBoolean completed = new AtomicBoolean(false);


myFlowable.parallel()

            .runOn(Schedulers.computation())

            .map(val -> myCollection.add(val))

            .sequential()

            .doFinally(() -> completed.set(true))

            .subscribe(val -> {

                ...

            });

    ...

try {

   while(!completed.get()){

       Thread.sleep(1000);

       ...

   }

  ...

} catch (InterruptedException e) {

  e.printStackTrace();

}


查看完整回答
反對 回復 2023-08-04
  • 3 回答
  • 0 關注
  • 201 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號