2 回答

TA貢獻1789條經驗 獲得超10個贊
我找到了一個可行的解決方案,但我不確定它是否是最好的:
我設置了一個用于發送異步消息的 Mono,并將其與一個過濾匹配消息的 Flux 合并??吹?Mono 從不發出值,我知道合并的第一個對象是來自我的 Flux 的響應消息,所以我可以將它轉換為正確的類型。
這仍然感覺有點臟,但話又說回來,嘗試使用用于異步工作的框架獲得阻塞行為總是感覺有點臟......
public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
Flux<AntMessage> response = this.antUsbReader.antMessages()
.filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
.take(1);
Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));
}
private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
if (message instanceof RequestMessage) {
return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
}
return response.getMessageId() == message.getMessageId();
}

TA貢獻1848條經驗 獲得超10個贊
查看您的代碼后,我會在球場上提出一些建議。我是用手機寫的,所以還沒有測試過。
但是我們先寫,然后阻塞 1 秒,然后我們返回過濾響應的提取。
Flux<AntMessage> response = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage)) .block(Duration.ofSeconds(1)) .thenReturn(this.antUsbReader.antMessages() .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage)) .take(1));
添加回答
舉報