亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Reactive Streams:如何按鍵等待所有發布者?

Reactive Streams:如何按鍵等待所有發布者?

慕雪6442864 2023-02-16 15:45:35
假設我有 3 個發布者和 1 個處理者。發布者以 形式發布項目{key: <integer>, value: <object>, publisher_id: <string>}。發布者進行 IO 操作,因此:一方面,我希望出版商N在特定時刻處理(大致)項目。另一方面,我希望消費者將項目合并{key: <integer>, values: <list>}到一個記錄中(即)我實際上已經實現了一個FluxProcessor具有內部存儲空間 ( ConcurrentHashMap) 來保存所有項目的。request()只要未達到 CAPACITY,它就會手動添加新項目。我想知道是否有內置功能可以使用 RxJava(2)/ Spring Reactor API 來做到這一點?
查看完整描述

1 回答

?
慕姐8265434

TA貢獻1813條經驗 獲得超2個贊

在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:


Flowable<KeyValuePublisher> source1 = ...

Flowable<KeyValuePublisher> source2 = ...

Flowable<KeyValuePublisher> source3 = ...


Flowable.merge(

    source1.rebatchRequests(N),

    source2.rebatchRequests(N),

    source3.rebatchRequests(N)

)

.toMultimap(kvp -> kvp.key, kvp -> kvp.value)

subscribe(map -> System.out.println(map));


查看完整回答
反對 回復 2023-02-16
  • 1 回答
  • 0 關注
  • 94 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號