我的WebSocketHandler實現中有這個:@Overridepublic Mono<Void> handle(WebSocketSession session) { return session.send( session.receive() .flatMap(webSocketMessage -> { int id = Integer.parseInt(webSocketMessage.getPayloadAsText()); Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id); var publisher = flux .<String>handle((o, sink) -> { try { sink.next(objectMapper.writeValueAsString(o)); } catch (JsonProcessingException e) { e.printStackTrace(); } }) .map(session::textMessage); return publisher; }) );}當前Flux<EfficiencyData>生成的用于在服務中進行測試如下:public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) { return Flux.interval(Duration.ofSeconds(1)) .map(aLong -> { longAdder.increment(); return new EfficiencyData(new MachineSpeed( RotationSpeed.ofRpm(longAdder.intValue()), RotationSpeed.ofRpm(0), RotationSpeed.ofRpm(400))); }).publish().autoConnect();}我用publish().autoConnect()它來使它成為熱流。我創建了一個單元測試,它啟動了 2 個線程,這些線程在返回的內容上執行此操作Flux:flux.log().handle((s, sink) -> { LOGGER.info("{}", s.getMachineSpeed().getCurrent()); }).subscribe();在這種情況下,我看到兩個線程每秒都打印出相同的值。但是,當我打開 2 個瀏覽器選項卡時,我在兩個網頁中看不到相同的值。連接的 websocket 客戶端越多,值之間的差異就越大(因此原始 Flux 中的每個值似乎都發送到不同的客戶端,而不是發送到所有客戶端)。
1 回答

白板的微信
TA貢獻1883條經驗 獲得超3個贊
問題是對于每個連接的 websocket 客戶端,我都會調用該service.subscribeToEfficiencyData(id)
方法,每次調用它都會返回一個新的Flux。因此,當然,這些獨立的 Flux 不會在不同的 websocket 客戶端之間共享。
為了解決這個問題,我Flux
在構造函數或PostConstruct
我的服務的方法中創建實例,以便subscribeToEfficiencyData
每次都返回相同的 Flux 實例。
請注意,.publish().autoConnect()
在 Flux 上仍然很重要,因為沒有那個 websocket 客戶端將再次看到不同的值!
添加回答
舉報
0/150
提交
取消