亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Spring Flux 中并行化數據庫查詢?

如何在 Spring Flux 中并行化數據庫查詢?

qq_花開花謝_0 2023-04-19 16:07:42
我想在 Spring 中mysql使用Flux<JSONObject>流公開來自數據庫的聚合結果。@RestControllerpublic class FluxController {     @GetMapping("/", produces = TEXT_EVENT_STREAM_VALUE)     public Flux<JSONObject> stream() {          return service.getJson();     }}@Servicepublic class DatabaseService {    public List<JSONObject> getJson() {        List<Long> refs = jdbc.queryForList(...);        MapSqlParameterSource params = new MapSqlParameterSource();        params.addValue("refs", refs);        //of course real world sql is much more complex        List<Long, Product> products = jdbc.query(SELECT * from products where ref IN (:refs), params);        List<Long, Item> items = jdbc.query(SELECT * from items where ref IN (:refs), params);        List<Long, Warehouse> warehouses = jdbc.query(SELECT * from warehouses where ref IN (:refs), params);        List<JSONObject> results = new ArrayList<>();        for (Long ref : refs) {            JSONObject json = new JSONObject();            json.put("ref", ref);            json.put("product", products.get(ref));            json.put("item", items.get(ref));            json.put("warehouse", warehouses.get(ref));            results.add(json);        }        return results;    }現在我想將其轉換為通量,將其作為事件流公開。但是我怎樣才能并行化數據庫查找并將它鏈接在一起成為一個通量呢?    public Flux<JSONObject> getJsonFlux() {        //I need this as source        List<Long> refs = jdbc.queryForList(...);        return Flux.fromIterable(refs).map(refs -> {            //TODO how to aggregate the different database calls concurrently?            //and then expose each JSONObject one by one into the stream as soon as it is build?        };    }旁注:我知道這仍然會阻塞。但在我的實際應用程序中,我正在應用分頁和分塊,所以每個塊都會在準備好時暴露給流。然后主要問題是我不知道如何并行化,然后聚合/合并結果,例如在最后一個通量步驟中。
查看完整描述

2 回答

?
慕工程0101907

TA貢獻1887條經驗 獲得超5個贊

這個想法是首先獲取 的完整列表refs,然后同時獲取 Products、Items 和 Warehouses——我稱之為 Tuple3 lookups。然后將每一個ref與組合起來,并一一lookups轉換。JSONObject


return Mono.fromCallable(jdbc::queryForList) //fetches refs

? ? ? ? ? ? ? ? .subscribeOn(Schedulers.elastic())

? ? ? ? ? ? ? ? .flatMapMany(refList -> { //flatMapMany allows to convert Mono to Flux in flatMap operation

? ? ? ? ? ? ? ? ? ? ? ? ? ? Flux<Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>>> lookups = Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList))

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .cache().repeat(); //notice cache - it makes sure that Mono.zip is executed only once, not for each zipWith call


? ? ? ? ? ? ? ? ? ? ? ? ? ? return Flux.fromIterable(refList)

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .zipWith(lookups);

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? )

? ? ? ? ? ? ? ? .map(t -> {

? ? ? ? ? ? ? ? ? ? Long ref = t.getT1();

? ? ? ? ? ? ? ? ? ? Tuple3<Map<Long, Product>, Map<Long, Item>, Map<Long, Warehouse>> lookups = t.getT2();

? ? ? ? ? ? ? ? ? ? JSONObject json = new JSONObject();

? ? ? ? ? ? ? ? ? ? json.put("ref", ref);

? ? ? ? ? ? ? ? ? ? json.put("product", lookups.getT1().get(ref));

? ? ? ? ? ? ? ? ? ? json.put("item", lookups.getT2().get(ref));

? ? ? ? ? ? ? ? ? ? json.put("warehouse", lookups.getT3().get(ref));

? ? ? ? ? ? ? ? ? ? return json;

? ? ? ? ? ? ? ? });

每個數據庫調用的方法:


Mono<Map<Long, Product>> fetchProducts(List<Long> refs) {

? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from products where ref IN(:refs),params))

? ? ? ? .subscribeOn(Schedulers.elastic());

}


Mono<Map<Long, Item>> fetchItems(List<Long> refs) {

? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from items where ref IN(:refs),params))

? ? ? ? .subscribeOn(Schedulers.elastic());

}


Mono<Map<Long, Warehouse>> fetchWarehouses(List<Long> refs) {

? ? return Mono.fromCallable(() -> jdbc.query(SELECT * from warehouses where ref IN(:refs),params))

? ? ? ? .subscribeOn(Schedulers.elastic());

}

為什么我需要訂閱?

我之所以這樣說是因為兩個原因:

  1. 它允許在專用線程池的線程上執行數據庫查詢,從而防止阻塞主線程

  2. 它允許真正并行化Mono.zip。看到這個,它是關于的flatMap,但它也適用于zip


.flatMap()為了完整起見,在 zip 結果上使用時也是可能的。雖然我不確定.cache()這里是否還有必要。

? .flatMapMany(refList -> {

? ? ? ? Mono.zip(fetchProducts(refList), fetchItems(refList), fetchWarehouses(refList)).cache()

? ? ? ? ? ? .flatMap(tuple -> Flux.fromIterable(refList).map(refId -> Tuples.of(refId, tuple)));

? ? .map(tuple -> {

? ? ? ? String refId = tuple.getT1();

? ? ? ? Tuple lookups = tuple.getT2();

? ? }

})


查看完整回答
反對 回復 2023-04-19
?
白豬掌柜的

TA貢獻1893條經驗 獲得超10個贊

如果我理解得很好,您想通過將所有引用作為參數傳遞來執行查詢。


它不會真正成為一個事件流,因為它會等到所有查詢都完成并且所有 json 對象都在內存中,然后才開始流式傳輸它們。


public Flux<JSONObject> getJsonFlux()

{

    return Mono.fromCallable(jdbc::queryForList)

               .subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one

               .flatMap(this::queryEntities)

               .map(this::createJsonObjects)

               .flatMapMany(Flux::fromIterable);

}


private Mono<Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>>> queryEntities(List<Long> refs)

{

    Mono<List<Product>> products = Mono.fromCallable(() -> jdbc.queryProducts(refs)).subscribeOn(Schedulers.elastic());

    Mono<List<Item>> items = Mono.fromCallable(() -> jdbc.queryItems(refs)).subscribeOn(Schedulers.elastic());

    Mono<List<Warehouse>> warehouses = Mono.fromCallable(() -> jdbc.queryWarehouses(refs)).subscribeOn(Schedulers.elastic());


    return Mono.zip(Mono.just(refs), products, items, warehouses); // query calls will be concurrent

}


private List<JSONObject> createJsonObjects(Tuple4<List<Long>, List<Product>, List<Item>, List<Warehouse>> tuple)

{

    List<Long> refs = tuple.getT1();

    List<Product> products = tuple.getT2();

    List<Item> items = tuple.getT3();

    List<Warehouse> warehouses = tuple.getT4();


    List<JSONObject> jsonObjects = new ArrayList<>();


    for (Long ref : refs)

    {

        JSONObject json = new JSONObject();

        // build json object here


        jsonObjects.add(json);

    }


    return jsonObjects;

}

另一種方法是分別查詢每個引用的實體。這樣每個 JSONObject 都被單獨查詢,并且它們可以在流中交錯。我不確定數據庫如何處理這種負載。這是你應該考慮的事情。


public Flux<JSONObject> getJsonFlux()

{

    return Mono.fromCallable(jdbc::queryForList)

               .flatMapMany(Flux::fromIterable)

               .subscribeOn(Schedulers.elastic()) // elastic thread pool meant for blocking IO, you can use a custom one

               .flatMap(this::queryEntities)

               .map(this::createJsonObject);

}


private Mono<Tuple4<Long, Product, Item, Warehouse>> queryEntities(Long ref)

{

    Mono<Product> product = Mono.fromCallable(() -> jdbc.queryProduct(ref)).subscribeOn(Schedulers.elastic());

    Mono<Item> item = Mono.fromCallable(() -> jdbc.queryItem(ref)).subscribeOn(Schedulers.elastic());

    Mono<Warehouse> warehouse = Mono.fromCallable(() -> jdbc.queryWarehouse(ref))

                                     .subscribeOn(Schedulers.elastic());


    return Mono.zip(Mono.just(ref), product, item, warehouse); // query calls will be concurrent

}


private JSONObject createJsonObject(Tuple4<Long, Product, Item, Warehouse> tuple)

{

    Long ref = tuple.getT1();

    Product product = tuple.getT2();

    Item item = tuple.getT3();

    Warehouse warehouse = tuple.getT4();


    JSONObject json = new JSONObject();

    // build json object here


    return json;

}


查看完整回答
反對 回復 2023-04-19
  • 2 回答
  • 0 關注
  • 195 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號