我正在編寫客戶端-服務器應用程序。我從數據庫中獲取數據并將其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因為我需要保證每個客戶端上的數據相同)當客戶端連接訂閱它時,我想將此數據發送給他但是當我嘗試它時我的頭“可能的方式^_^”它阻止了。通過塊我的意思是它不發送數據但是當我關閉服務器數據時立即顯示在客戶端。我嘗試在客戶端和服務器端事件循環中添加一些線程(我在想可能是線程塊,因為我使用“無限”源所以接收這個我需要另一個線程或類似的東西)。服務器端通道代碼:public class ClientHandler extends SimpleChannelInboundHandler<DataWrapper> { private final Observable<DataWrapper> data; public ClientHandler(Observable<DataWrapper> data) { this.data = data; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // super.channelRegistered(ctx); final Channel channel = ctx.channel(); Server .INSTANCE .appendToChannelGroup(channel); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // super.channelActive(ctx); // i believe there is something wrong data.subscribe(ctx::writeAndFlush); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } // rest skip}客戶端:public class DirectNetworkCommunicator extends SimpleChannelInboundHandler<DataWrapper> { private Observable<DataWrapper> generatedData; private ExecutorService fallbackThread; DirectNetworkCommunicator(Observable<DataWrapper> generatedData) { this.fallbackThread = Executors.newSingleThreadExecutor(); this.generatedData = generatedData; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // super.channelRead(ctx, msg); DataWrapper inComingData = (DataWrapper) msg; Adapter .INSTANCE .appendFromNettworkData(inComingData); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // super.channelReadComplete(ctx); ctx.flush(); } // rest skip}所以我之前提到過我希望它在服務器關閉時接收數據,而不是在服務器關閉時接收數據 ^_^。如果那會幫助 netty 版本 4.1.37 final。
1 回答

汪汪一只貓
TA貢獻1898條經驗 獲得超8個贊
好的,所以未來的人們會面臨同樣的問題,我自己找到了答案。來自客戶端的 Netty 使用后臺線程作為通信的主要線程,這意味著我要等待主線程釋放,然后它才能對 observable 進行操作。希望它能幫助別人。
- 1 回答
- 0 關注
- 151 瀏覽
添加回答
舉報
0/150
提交
取消