亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

當我從 ReplaySubject 使用 Observable 時阻止

當我從 ReplaySubject 使用 Observable 時阻止

慕仙森 2023-05-17 17:56:28
我正在編寫客戶端-服務器應用程序。我從數據庫中獲取數據并將其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因為我需要保證每個客戶端上的數據相同)當客戶端連接訂閱它時,我想將此數據發送給他但是當我嘗試它時我的頭“可能的方式^_^”它阻止了。通過塊我的意思是它不發送數據但是當我關閉服務器數據時立即顯示在客戶端。我嘗試在客戶端和服務器端事件循環中添加一些線程(我在想可能是線程塊,因為我使用“無限”源所以接收這個我需要另一個線程或類似的東西)。服務器端通道代碼:public    class ClientHandler        extends SimpleChannelInboundHandler<DataWrapper> {    private final Observable<DataWrapper> data;    public ClientHandler(Observable<DataWrapper> data) {        this.data = data;    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        // super.channelRegistered(ctx);        final Channel channel = ctx.channel();        Server            .INSTANCE            .appendToChannelGroup(channel);    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // super.channelActive(ctx);        // i believe there is something wrong        data.subscribe(ctx::writeAndFlush);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }    // rest skip}客戶端:public    class DirectNetworkCommunicator        extends SimpleChannelInboundHandler<DataWrapper> {    private Observable<DataWrapper> generatedData;    private ExecutorService fallbackThread;    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {        this.fallbackThread = Executors.newSingleThreadExecutor();        this.generatedData = generatedData;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        // super.channelRead(ctx, msg);        DataWrapper inComingData = (DataWrapper) msg;        Adapter            .INSTANCE            .appendFromNettworkData(inComingData);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // super.channelReadComplete(ctx);        ctx.flush();    }    // rest skip}所以我之前提到過我希望它在服務器關閉時接收數據,而不是在服務器關閉時接收數據 ^_^。如果那會幫助 netty 版本 4.1.37 final。
查看完整描述

1 回答

?
汪汪一只貓

TA貢獻1898條經驗 獲得超8個贊

好的,所以未來的人們會面臨同樣的問題,我自己找到了答案。來自客戶端的 Netty 使用后臺線程作為通信的主要線程,這意味著我要等待主線程釋放,然后它才能對 observable 進行操作。希望它能幫助別人。



查看完整回答
反對 回復 2023-05-17
  • 1 回答
  • 0 關注
  • 151 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號