2 回答

TA貢獻1887條經驗 獲得超5個贊
這個想法是首先獲取 的完整列表refs,然后同時獲取 Products、Items 和 Warehouses——我稱之為 Tuple3 lookups。然后將每一個ref與組合起來,并一一lookups轉換。JSONObject
return Mono.fromCallable(jdbc::queryForList) //fetches refs
? ? ? ? ? ? ? ? .subscribeOn(Schedulers.elastic())
? ? ? ? ? ? ? ? .flatMapMany(refList -> { //flatMapMany allows to convert Mono to Flux in flatMap operation
? ? ? ? ? ? ? ? ? ? ? ? ? ? Flux<Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>>> lookups = Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .cache().repeat(); //notice cache - it makes sure that Mono.zip is executed only once, not for each zipWith call
? ? ? ? ? ? ? ? ? ? ? ? ? ? return Flux.fromIterable(refList)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .zipWith(lookups);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? )
? ? ? ? ? ? ? ? .map(t -> {
? ? ? ? ? ? ? ? ? ? Long ref = t.getT1();
? ? ? ? ? ? ? ? ? ? Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>> lookups = t.getT2();
? ? ? ? ? ? ? ? ? ? JSONObject json = new JSONObject();
? ? ? ? ? ? ? ? ? ? json.put("ref", ref);
? ? ? ? ? ? ? ? ? ? json.put("product", lookups.getT1().get(ref));
? ? ? ? ? ? ? ? ? ? json.put("item", lookups.getT2().get(ref));
? ? ? ? ? ? ? ? ? ? json.put("warehouse", lookups.getT3().get(ref));
? ? ? ? ? ? ? ? ? ? return json;
? ? ? ? ? ? ? ? });
每個數據庫調用的方法:
Mono<Map<Long, Product>> fetchProducts(List<Long> refs) {
? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from products where ref IN(:refs),params))
? ? ? ? .subscribeOn(Schedulers.elastic());
}
Mono<Map<Long, Item>> fetchItems(List<Long> refs) {
? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from items where ref IN(:refs),params))
? ? ? ? .subscribeOn(Schedulers.elastic());
}
Mono<Map<Long, Warehouse>> fetchWarehouses(List<Long> refs) {
? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from warehouses where ref IN(:refs),params))
? ? ? ? .subscribeOn(Schedulers.elastic());
}
為什么我需要訂閱?
我之所以這樣說是因為兩個原因:
它允許在專用線程池的線程上執行數據庫查詢,從而防止阻塞主線程
它允許真正并行化
Mono.zip
。看到這個,它是關于的flatMap
,但它也適用于zip
.flatMap()
為了完整起見,在 zip 結果上使用時也是可能的。雖然我不確定.cache()
這里是否還有必要。
? .flatMapMany(refList -> {
? ? ? ? Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList)).cache()
? ? ? ? ? ? .flatMap(tuple -> Flux.fromIterable(refList).map(refId -> Tuples.of(refId, tuple)));
? ? .map(tuple -> {
? ? ? ? String refId = tuple.getT1();
? ? ? ? Tuple lookups = tuple.getT2();
? ? }
})

TA貢獻1893條經驗 獲得超10個贊
如果我理解得很好,您想通過將所有引用作為參數傳遞來執行查詢。
它不會真正成為一個事件流,因為它會等到所有查詢都完成并且所有 json 對象都在內存中,然后才開始流式傳輸它們。
public Flux<JSONObject> getJsonFlux()
{
return Mono.fromCallable(jdbc::queryForList)
.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one
.flatMap(this::queryEntities)
.map(this::createJsonObjects)
.flatMapMany(Flux::fromIterable);
}
private Mono<Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>>> queryEntities(List<Long> refs)
{
Mono<List<Product>> products = Mono.fromCallable(() -> jdbc.queryProducts(refs)).subscribeOn(Schedulers.elastic());
Mono<List<Item>> items = Mono.fromCallable(() -> jdbc.queryItems(refs)).subscribeOn(Schedulers.elastic());
Mono<List<Warehouse>> warehouses = Mono.fromCallable(() -> jdbc.queryWarehouses(refs)).subscribeOn(Schedulers.elastic());
return Mono.zip(Mono.just(refs), products, items, warehouses); // query calls will be concurrent
}
private List<JSONObject> createJsonObjects(Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>> tuple)
{
List<Long> refs = tuple.getT1();
List<Product> products = tuple.getT2();
List<Item> items = tuple.getT3();
List<Warehouse> warehouses = tuple.getT4();
List<JSONObject> jsonObjects = new ArrayList<>();
for (Long ref : refs)
{
JSONObject json = new JSONObject();
// build json object here
jsonObjects.add(json);
}
return jsonObjects;
}
另一種方法是分別查詢每個引用的實體。這樣每個 JSONObject 都被單獨查詢,并且它們可以在流中交錯。我不確定數據庫如何處理這種負載。這是你應該考慮的事情。
public Flux<JSONObject> getJsonFlux()
{
return Mono.fromCallable(jdbc::queryForList)
.flatMapMany(Flux::fromIterable)
.subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one
.flatMap(this::queryEntities)
.map(this::createJsonObject);
}
private Mono<Tuple4<Long, Product, Item, Warehouse>> queryEntities(Long ref)
{
Mono<Product> product = Mono.fromCallable(() -> jdbc.queryProduct(ref)).subscribeOn(Schedulers.elastic());
Mono<Item> item = Mono.fromCallable(() -> jdbc.queryItem(ref)).subscribeOn(Schedulers.elastic());
Mono<Warehouse> warehouse = Mono.fromCallable(() -> jdbc.queryWarehouse(ref))
.subscribeOn(Schedulers.elastic());
return Mono.zip(Mono.just(ref), product, item, warehouse); // query calls will be concurrent
}
private JSONObject createJsonObject(Tuple4<Long, Product, Item, Warehouse> tuple)
{
Long ref = tuple.getT1();
Product product = tuple.getT2();
Item item = tuple.getT3();
Warehouse warehouse = tuple.getT4();
JSONObject json = new JSONObject();
// build json object here
return json;
}
添加回答
舉報