假設我有 3 個發布者和 1 個處理者。發布者以 形式發布項目{key: <integer>, value: <object>, publisher_id: <string>}。發布者進行 IO 操作,因此:一方面,我希望出版商N在特定時刻處理(大致)項目。另一方面,我希望消費者將項目合并{key: <integer>, values: <list>}到一個記錄中(即)我實際上已經實現了一個FluxProcessor具有內部存儲空間 ( ConcurrentHashMap) 來保存所有項目的。request()只要未達到 CAPACITY,它就會手動添加新項目。我想知道是否有內置功能可以使用 RxJava(2)/ Spring Reactor API 來做到這一點?
1 回答

慕姐8265434
TA貢獻1813條經驗 獲得超2個贊
在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:
Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...
Flowable.merge(
source1.rebatchRequests(N),
source2.rebatchRequests(N),
source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));
添加回答
舉報
0/150
提交
取消