亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Reactor - 理解 .flatMap() 中的線程池

Reactor - 理解 .flatMap() 中的線程池

函數式編程 2023-03-31 15:38:28
我試圖了解反應式編程是如何工作的。我為此準備了簡單的演示:WebClient來自 Spring Framework 的 reactive 將請求發送到簡單的 rest api,并且此客戶端在每個操作中打印線程名稱。休息API:@RestController@SpringBootApplicationpublic class RestApiApplication {    public static void main(String[] args) {        SpringApplication.run(RestApiApplication.class, args);    }    @PostMapping("/resource")    public void consumeResource(@RequestBody Resource resource) {        System.out.println(String.format("consumed resource: %s", resource.toString()));    }}@Data@AllArgsConstructorclass Resource {    private final Long id;    private final String name;}問題是行為與我預期的不同。.map()我預計,.filter()和 的每次調用都.flatMap()將在線程上執行,而ormain的每次調用都將在 nio 線程池中的線程上執行。所以我希望日志看起來像:.doOnSuccess().doOnError------- map [main] --------------- filter [main] --------------- flatmap [main] --------(and so on...)------- onsuccess [reactor-http-nio-2] --------(and so on...)但我得到的日志是:------- map [main] --------------- filter [main] --------------- flatmap [main] --------------- map [main] --------------- filter [main] --------------- flatmap [main] --------------- onsuccess [reactor-http-nio-2] --------------- onsuccess [reactor-http-nio-6] --------------- onsuccess [reactor-http-nio-4] --------------- onsuccess [reactor-http-nio-8] --------------- map [reactor-http-nio-2] --------------- filter [reactor-http-nio-2] --------------- flatmap [reactor-http-nio-2] --------------- map [reactor-http-nio-2] --------每次下一次登錄.map(),.filter()都是.flatMap()在 reactor-http-nio 的線程上完成的。下一個難以理解的事實是在主線程和 reactor-http-nio 上執行的操作之間的比率總是不同的。有時所有操作.map(),.filter()和.flatMap()都在主線程上執行。
查看完整描述

1 回答

?
慕桂英546537

TA貢獻1848條經驗 獲得超10個贊

Reactor 和 RxJava 一樣,可以被認為是并發不可知的。也就是說,它不強制執行并發模型。相反,它讓您(開發人員)掌握一切。但是,這并不妨礙該庫幫助您處理并發。

獲得aFlux或aMono并不一定意味著它運行在專用的Thread中。相反,大多數運算符繼續在前一個運算符執行的線程中工作。subscribe()除非指定,否則最頂層的運算符(源)本身在進行調用的線程上運行。

從您的代碼中,以下代碼段:

webClient.post()
?????????.uri("/resource")
?????????.syncBody(res)
?????????.header("Content-Type",?"application/json")
?????????.header("Accept",?"application/json")
?????????.retrieve()
?????????.bodyToMono(Resource.class)

導致線程從main切換到netty 的工作池。之后,以下所有操作均由 netty 工作線程執行。

如果你想控制這種行為,你應該publishOn(...)在你的代碼中添加一條語句,例如:

webClient.post()
?????????.uri("/resource")
?????????.syncBody(res)
?????????.header("Content-Type",?"application/json")
?????????.header("Accept",?"application/json")
?????????.retrieve()
?????????.bodyToMono(Resource.class)
?????????.publishOn(Schedulers.elastic())

這樣,彈性調度程序線程池將執行任何后續操作。

另一個例子是使用專用調度程序處理 HTTP 請求執行后的繁重任務。

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;

import static com.github.tomakehurst.wiremock.client.WireMock.get;

import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;


import com.github.tomakehurst.wiremock.WireMockServer;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.web.reactive.function.client.ClientResponse;

import org.springframework.web.reactive.function.client.WebClient;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import reactor.core.scheduler.Schedulers;

import ru.lanwen.wiremock.ext.WiremockResolver;

import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;

import ru.lanwen.wiremock.ext.WiremockUriResolver;

import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;


@ExtendWith({

? WiremockResolver.class,

? WiremockUriResolver.class

})

public class ReactiveThreadsControlTest {


? private static int concurrency = 1;


? private final WebClient webClient = WebClient.create();


? @Test

? public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {


? ? String requestUri = "/slow-response";


? ? server.stubFor(get(urlEqualTo(requestUri))

? ? ? .willReturn(aResponse().withStatus(200)

? ? ? ? .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))

? ? );


? ? Flux

? ? ? .generate(() -> Integer.valueOf(1), (i, sink) -> {

? ? ? ? System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));

? ? ? ? sink.next(i);

? ? ? ? return i + 1;

? ? ? })

? ? ? .subscribeOn(Schedulers.single())

? ? ? .flatMap(i ->

? ? ? ? ? executeGet(uri + requestUri)

? ? ? ? ? ? .publishOn(Schedulers.elastic())

? ? ? ? ? ? .map(response -> {

? ? ? ? ? ? ? heavyTask();

? ? ? ? ? ? ? return true;

? ? ? ? ? ? })

? ? ? ? , concurrency)

? ? ? .subscribe();


? ? blockForever();

? }


? private void blockForever() {

? ? Object monitor = new Object();


? ? synchronized (monitor) {

? ? ? try {

? ? ? ? monitor.wait();

? ? ? } catch (InterruptedException ex) {

? ? ? }

? ? }

? }



? private Mono<ClientResponse> executeGet(String path) {

? ? System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));

? ? return webClient

? ? ? .get()

? ? ? .uri(path)

? ? ? .exchange();

? }


? private void heavyTask() {

? ? try {

? ? ? System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));

? ? ? Thread.sleep(TimeUnit.SECONDS.toMillis(20));

? ? } catch (InterruptedException ex) {

? ? }

? }

}


查看完整回答
反對 回復 2023-03-31
  • 1 回答
  • 0 關注
  • 200 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號