Netty ChannelFuture 異步監聽
1. 前言
本節主要講解 ChannelFuture ,它的作用是用來保存 Channel 異步操作的結果,可以看作是一個異步操作結果的占位符。
2. 概念
在 Netty 中所有的 IO 操作都是異步的,不能立刻得到 IO 操作的執行結果,但是可以通過注冊一個監聽器來監聽其執行結果。在 Java 的并發編程當中可以通過 Future 來進行異步結果的監聽,但是在 Netty 當中是通過 ChannelFuture 來實現異步結果的監聽。通過注冊一個監聽的方式進行監聽,當操作執行成功或者失敗時監聽會自動觸發注冊的監聽事件。
3. 應用場景
ChannelFture 在開發當中經常需要用到,可以用來監聽客戶端連接服務端的結果反饋,Netty 是異步操作,無法知道什么時候執行完成,因此可以通過 ChannelFuture 來進行執行結果的監聽。在 Netty 當中 Bind 、Write 、Connect 等操作會簡單的返回一個 ChannelFuture。
4. 核心方法
序號 | 方法 | 描述 |
---|---|---|
1 | addListener | 注冊監聽器,當操作已完成 (isDone 方法返回完成),將會通知指定的監聽器;如果 Future 對象已完成,則通知指定的監聽器 |
2 | removeListener | 移除監聽器 |
3 | sync | 等待異步操作執行完畢 |
4 | await | 等待異步操作執行完畢 |
5 | isDone | 判斷當前操作是否完成 |
6 | isSuccess | 判斷已完成的當前操作是否成功 |
7 | isCancellable | 判斷已完成的當前操作是否被取消 |
8 | cause | 獲取已完成的當前操作失敗的原因 |
sync () 和 await () 都是等待異步操作執行完成,那么它們有什么區別呢?
- sync () 會拋出異常,建議使用 sync ();
- await () 不會拋出異常,主線程無法捕捉子線程執行拋出的異常。
5. 深入了解 ChannelFuture
5.1 生命周期說明
Future 可以通過四個核心方法來判斷任務的執行情況。
狀態 | 說明 |
---|---|
isDone() | 任務是否執行完成,無論成功還是失敗 |
isSuccess() | 任務是否執行采購 |
isCancelled() | 任務是否被取消 |
cause() | 獲取執行異常信息 |
執行過程狀態的改變說明
當一個異步任務操作開始的時候,一個新的 future 對象就會被創建。在開始的時候該 future 是處于未完成的狀態,也就是說,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要該任務中任何一種狀態結束了,無論是說成功、失敗、或者被取消,那么整個 Future 就會被標記為已完成。注意的是,如果執行失敗那么 cause () 方法會返回異常信息的內容。
實例:
ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isDone()){
if(future.isSuccess()){
System.out.println("執行成功...");
}else if(future.isCancelled()){
System.out.println("任務被取消...");
}else if(future.cause()!=null){
System.out.println("執行出錯:"+future.cause().getMessage());
}
}
}
});
5.2 ChannelFuture 父接口說明
ChannelFuture 的類繼承結構,具體如下所示:
public interface ChannelFuture extends Future<Void> {
}
public interface Future<V> extends java.util.concurrent.Future<V> {
}
通過上面的繼承關系,我們可以清晰的知道 ChannelFuture 其實最頂層的接口是來自 java 并發包的 Future,java 并發包下的 Future 需要手工檢查執行結果是否已經完成,非常的繁瑣,因此 Netty 把它進行了封裝和完善,變成了自動的監聽,用起來變的非常的簡單。
java 并發包下的 Future 主要存在以下幾個缺陷:
- 只允許手動通過 get () 來檢查對應的操作是否已經完成,它是堵塞直到子線程完成執行并且返回結果;
- 只有 isDone () 方法判斷一個異步操作是否完成,但是對于完成的定義過于模糊,JDK 文檔指出正常終止、拋出異常、用戶取消都會使 isDone () 方法返回真。并不能很好的區分到底是哪種狀態。
get () 方法是堵塞的,必須等待子線程執行完成才能往下執行。
實例:
//1.定義一個子線程,實現 Callable 接口
public class ThreadTest implements Callable<Integer>{
@Override
public Integer call(){
//打印
System.out.println(">>>>>>>>子線程休眠之前");
//休眠5秒
Thread.sleep(5000);
//打印
System.out.println(">>>>>>>>子線程休眠之后");
return 1;
}
}
//2.調用子線程處理
public static void main(String[] args){
ThreadTest t=new ThreadTest();
FutureTask<Integer> future=new FutureTask<Integer>(t);
//2.1.開始執行子線程
new Thread(future).start();
//2.2.手工返回結果
int result=future.get();
System.out.println(">>>>>>>>執行結果:"+result);
//2.3.操作數據庫
userDao.updateStatus("1");
}
執行結果:
>>>>>>>>子線程休眠之前
>>>>>>>>子線程休眠之后
>>>>>>>>執行結果:1
結論總結:
- 說明了 Java 并發包的 Future 要想獲取異步執行結果,必須手工調用 get () 方法,此時雖然能獲取執行結果,但是無法知道執行結果是成功還是失敗;
- 使用 get () 獲取執行結果,但是 get () 后面的業務則被堵塞,直到后面執行完畢才會往下執行,失去了異步操作提高執行效率的意義了。
6. ChannelFuture 原理
6.1 線程堵塞
思考:sync () 和 await () 方法如何同步等待執行完成并獲取執行結果的呢?
源碼分析如下所示:
private short waiters;//計數器
@Override
public Promise<V> await() throws InterruptedException {
//1.判斷是否執行完成,如果執行完成則返回
if (isDone()) {
return this;
}
//2.線程是否已經中斷,如果中斷則拋異常
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
//3.檢查死鎖
checkDeadLock();
//4.同步代碼塊->while循環不斷的監聽執行結果
synchronized (this) {
while (!isDone()) {
incWaiters();//waiters遞增
try {
wait();//JDK 的 Object 方法,線程等待【核心】
} finally {
decWaiters();//waiters 遞減
}
}
}
return this;
}
//遞增函數
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
//遞減函數
private void decWaiters() {
--waiters;
}
通過以上代碼,我們發現 await () 的核心其實就是調用 Object 的 wait () 方法進行線程休眠,普通的 Java 多線程知識點。
6.2 線程喚醒
思考:當前線程休眠了,那么什么時候進行喚醒呢?
源碼分析如下所示:
@Override
public Promise<V> setSuccess(V result) {
//1.setSuccess0 賦值操作
if (setSuccess0(result)) {
//2.通知執行監聽器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
//繼續進入方法
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
//繼續進入方法
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
//核心:喚醒之前休眠的線程
notifyAll();
}
}
源碼分析總結:
- 堵塞的核心是通過 Object.wait () 方法進行休眠當前線程,普通的 Java 多線程知識;
- 執行完成之后給不同狀態(setSuccess、setFailure)賦值的時候喚醒休眠的線程;
- 喚醒線程之后調用監聽器的方法
l.operationComplete(future);
7. 小結
通過本節的學習,我們需要掌握以下幾個核心知識點:
- 掌握異步的概念,傳統 I/O 是同步堵塞的,執行 I/O 操作后線程會被阻塞住,直到操作完成;異步處理的好處是不會造成線程阻塞,可以通過 Future 來監聽異步執行的結果;
- ChannelFuture 的幾種狀態,以及它的值變化時機;
- ChannelFuture 的堵塞和喚醒源碼分析。