2 回答

TA貢獻1853條經驗 獲得超9個贊
一種方法是使用某些處理器,如 DirectProcessor,您可以創建 2 個不同的處理器,并在事件向處理器發出項目并訂閱處理器時,但如果您仍想使用 Flux.create,您可以這樣做
Flux<Object> objectFlux;
@Override
public void run(String... args) throws Exception {
objectFlux = Flux.create(objectFluxSink ->
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
objectFluxSink.next(state);
}
@Override
public void onResultData(Result result) {
objectFluxSink.next(state);
}
}));
}
public Flux<Result> getResult(){
return objectFlux.filter(o -> o instanceof Result)
.map(o -> ((Result)o));
}
public Flux<State> geState(){
return objectFlux.filter(o -> o instanceof State)
.map(o -> ((State)o));
}
我仍然認為使用處理器應該更清潔,你不需要做那個過濾和鑄造,但你需要有 2 個這樣的處理器:
DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
stateDirectProcessor.onNext(state);
}
@Override
public void onResultData(Result result) {
resultDirectProcessor.onNext(result);
}
});

TA貢獻1812條經驗 獲得超5個贊
只是可用于給定任務的小片段。
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("some string");
添加回答
舉報