亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在項目反應器中設置阻塞異步請求/響應?

如何在項目反應器中設置阻塞異步請求/響應?

冉冉說 2023-05-10 14:19:48
我正在連接一個 ANT+ USB 記憶棒,并用項目反應器替換我自己的天真“MessageBus”,因為它看起來非常合適。USB 接口本質上是異步的(單獨的輸入/輸出管道),我想以阻塞方式處理一組請求/響應消息。我已經設置了一個單獨的線程,它不斷地從 usb in-pipe 中讀取消息并將它們寫入一個接收器,該接收器提供一個共享的 Flux,任何人都可以訂閱。這似乎工作正常。目前我向 usb 管道發送一條消息,然后在共享通量上使用 .filter() 和 .blockFirst() :(人為代碼)    /**     * Puts a message on the Usb  out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}     *     * @param message Message to send.     * @return related response message.     */    public AntMessage sendBlocking(AntBlockingMessage message) {        send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void        // bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.        return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite)  Flux<AntMessage>                .filter(antMessage -> antMessage.getMessageId() == message.getMessageId())                .blockFirst(Duration.ofSeconds(10));    }問題是 usb 記憶棒甚至在 flux 被激活之前就可以響應,從而導致 TimeoutException。添加一個Thread.sleep(10)到 usb 讀卡器“解決”了這個問題,但是實現這種阻塞行為的正確方法是什么?設置訂閱(使用 .take(1)),發送消息然后阻塞訂閱?設置一個發送和等待正確響應都完成的 Flux?我想不通...
查看完整描述

2 回答

?
至尊寶的傳說

TA貢獻1789條經驗 獲得超10個贊

我找到了一個可行的解決方案,但我不確定它是否是最好的:


我設置了一個用于發送異步消息的 Mono,并將其與一個過濾匹配消息的 Flux 合并??吹?Mono 從不發出值,我知道合并的第一個對象是來自我的 Flux 的響應消息,所以我可以將它轉換為正確的類型。


這仍然感覺有點臟,但話又說回來,嘗試使用用于異步工作的框架獲得阻塞行為總是感覺有點臟......


    public AntMessage sendBlocking(AntBlockingMessage requestMessage) {

        Flux<AntMessage> response = this.antUsbReader.antMessages()

                .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))

                .take(1);


        Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));

        return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));

    }


    private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {

        if (message instanceof RequestMessage) {

            return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();

        }

        return response.getMessageId() == message.getMessageId();

    }


查看完整回答
反對 回復 2023-05-10
?
慕桂英546537

TA貢獻1848條經驗 獲得超10個贊

查看您的代碼后,我會在球場上提出一些建議。我是用手機寫的,所以還沒有測試過。

但是我們先寫,然后阻塞 1 秒,然后我們返回過濾響應的提取。

Flux<AntMessage> response = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage))
    .block(Duration.ofSeconds(1))
    .thenReturn(this.antUsbReader.antMessages()
            .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
            .take(1));


查看完整回答
反對 回復 2023-05-10
  • 2 回答
  • 0 關注
  • 178 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號