亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何從 aysnc 回調創建多個 Flux

如何從 aysnc 回調創建多個 Flux

一只名叫tom的貓 2022-05-25 10:57:06
從 Reactor 的參考指南中,我了解到Flux.create()可用于將 aysnc 回調轉換為Flux.但是,有時回調有多種方法來接收多種類型的數據,假設我有如下代碼:asrService.recognize(new Callback() {    @Override    public void stateChange(State state) {        // consume state    }    @Override    public void onResultData(Result result) {        // consume result    }});如何將其轉換為兩個反應流:Flux<State>和Flux<Result>?
查看完整描述

2 回答

?
暮色呼如

TA貢獻1853條經驗 獲得超9個贊

一種方法是使用某些處理器,如 DirectProcessor,您可以創建 2 個不同的處理器,并在事件向處理器發出項目并訂閱處理器時,但如果您仍想使用 Flux.create,您可以這樣做


    Flux<Object> objectFlux;


@Override

public void run(String... args) throws Exception {


    objectFlux = Flux.create(objectFluxSink ->

            asrService.recognize(new Callback() {

                @Override

                public void stateChange(State state) {

                    objectFluxSink.next(state);

                }


                @Override

                public void onResultData(Result result) {

                    objectFluxSink.next(state);

                }

            }));






}


public Flux<Result> getResult(){

 return    objectFlux.filter(o -> o instanceof Result)

            .map(o -> ((Result)o));

}


public Flux<State> geState(){

    return    objectFlux.filter(o -> o instanceof State)

            .map(o -> ((State)o));

}

我仍然認為使用處理器應該更清潔,你不需要做那個過濾和鑄造,但你需要有 2 個這樣的處理器:


        DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();

    DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();

    asrService.recognize(new Callback() {

        @Override

        public void stateChange(State state) {

            stateDirectProcessor.onNext(state);

        }


        @Override

        public void onResultData(Result result) {

            resultDirectProcessor.onNext(result);

        }

    });


查看完整回答
反對 回復 2022-05-25
?
ABOUTYOU

TA貢獻1812條經驗 獲得超5個贊

只是可用于給定任務的小片段。


Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

Sinks.EmitResult result = sink.tryEmitNext("some string");


查看完整回答
反對 回復 2022-05-25
  • 2 回答
  • 0 關注
  • 106 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號