亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

RxJava 相當于簡單的 ThreadPoolExecutor 示例

RxJava 相當于簡單的 ThreadPoolExecutor 示例

千萬里不及你 2023-09-27 16:54:17
我已經退出 Java 游戲大約 8 年了,從那時起發生了很多變化。對我來說最大的挑戰是 RxJava / 反應式。我正在尋找有關如何以完全反應式方式執行以下等效操作的粗略指導。Stuff下面使用 ThreadPoolExecutor 實現的基本要求是通過調用遠程 Web 服務來處理大量數據,該服務的記錄速率限制為 100 個請求/分鐘。我的目標是盡可能快地處理盡可能多的數據,不丟失任何數據,Stuff但仍遵守下游速率限制。該代碼已被簡化,以避免錯誤、隔板、斷路器、重試邏輯等。這段代碼目前工作正常,但在所有非阻塞反應選項的情況下,它會導致感覺浪費了很多線程。甚至我用來調用服務的 HTTP 客戶端也會返回 a Flowable,我只是在執行程序的 20 個線程中的每個線程中阻塞它。我很想了解反應性等價物應該是什么。我一直在努力的地方是我發現的幾乎所有文檔都展示了使用 Observable 的靜態源(例如Observable.fromArray(1,2,3,4,5):)。我知道解決方案可能涉及IoScheduler和groupBy,但我還沒有弄清楚如何Flowable將來自我的 HTTP 客戶端的 s 合并到某個完整的鏈中,該鏈可以進行并行化(最多限制,例如 20)和速率限制。public class Example {    private static final int THREADS = 20;    // using https://docs.micronaut.io/latest/guide/index.html#httpClient    @Client("http://stuff-processor.internal:8080")    @Inject    RxHttpClient httpClient;    private ThreadPoolExecutor executor;    private final RateLimiter rateLimiter;    public Example() {        // up to 20 threads to process the unbounded queue        // incoming Stuff is very bursty...        // ...we could go hours without anything and then hundreds could come in        this.executor = new ThreadPoolExecutor(THREADS, THREADS,                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());        this.executor.allowCoreThreadTimeOut(true);        // using https://resilience4j.readme.io/docs/ratelimiter        RateLimiterConfig config = RateLimiterConfig.custom()                .limitRefreshPeriod(Duration.ofSeconds(60))                .limitForPeriod(100)                .timeoutDuration(Duration.ofSeconds(90))                .build();        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);    }    /**     * Called when the user takes an action that can cause 1 or 1000s of new     * Stuff to be entered into the system. Each instance of Stuff results in     * a separate call to this method. Ex: 100 Stuffs = 100 calls.     */
查看完整描述

1 回答

?
Cats萌萌

TA貢獻1805條經驗 獲得超9個贊

首先,要以完全非阻塞的方式構建它,您需要使用像 Netty 這樣的非阻塞、異步 HTTP 客戶端庫。我不確定如何RxHttpClient運作。

假設你有一個 list stuff。我就是這樣做的:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap合并響應。

為了限制速率,您flatMap有第二個參數,它限制它并行訂閱的內部流的數量。假設您想同時撥打不超過 10 個電話。做這個:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();



查看完整回答
反對 回復 2023-09-27
  • 1 回答
  • 0 關注
  • 125 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號