亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

首頁 慕課教程 Netty 教程 Netty 教程 Netty ChannelFuture 異步監聽

Netty ChannelFuture 異步監聽

1. 前言

本節主要講解 ChannelFuture ,它的作用是用來保存 Channel 異步操作的結果,可以看作是一個異步操作結果的占位符。

2. 概念

在 Netty 中所有的 IO 操作都是異步的,不能立刻得到 IO 操作的執行結果,但是可以通過注冊一個監聽器來監聽其執行結果。在 Java 的并發編程當中可以通過 Future 來進行異步結果的監聽,但是在 Netty 當中是通過 ChannelFuture 來實現異步結果的監聽。通過注冊一個監聽的方式進行監聽,當操作執行成功或者失敗時監聽會自動觸發注冊的監聽事件。

3. 應用場景

ChannelFture 在開發當中經常需要用到,可以用來監聽客戶端連接服務端的結果反饋,Netty 是異步操作,無法知道什么時候執行完成,因此可以通過 ChannelFuture 來進行執行結果的監聽。在 Netty 當中 Bind 、Write 、Connect 等操作會簡單的返回一個 ChannelFuture。

4. 核心方法

序號 方法 描述
1 addListener 注冊監聽器,當操作已完成 (isDone 方法返回完成),將會通知指定的監聽器;如果 Future 對象已完成,則通知指定的監聽器
2 removeListener 移除監聽器
3 sync 等待異步操作執行完畢
4 await 等待異步操作執行完畢
5 isDone 判斷當前操作是否完成
6 isSuccess 判斷已完成的當前操作是否成功
7 isCancellable 判斷已完成的當前操作是否被取消
8 cause 獲取已完成的當前操作失敗的原因

sync () 和 await () 都是等待異步操作執行完成,那么它們有什么區別呢?

  1. sync () 會拋出異常,建議使用 sync ();
  2. await () 不會拋出異常,主線程無法捕捉子線程執行拋出的異常。

5. 深入了解 ChannelFuture

5.1 生命周期說明

Future 可以通過四個核心方法來判斷任務的執行情況。

狀態 說明
isDone() 任務是否執行完成,無論成功還是失敗
isSuccess() 任務是否執行采購
isCancelled() 任務是否被取消
cause() 獲取執行異常信息

執行過程狀態的改變說明

當一個異步任務操作開始的時候,一個新的 future 對象就會被創建。在開始的時候該 future 是處于未完成的狀態,也就是說,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要該任務中任何一種狀態結束了,無論是說成功、失敗、或者被取消,那么整個 Future 就會被標記為已完成。注意的是,如果執行失敗那么 cause () 方法會返回異常信息的內容。

圖片描述

實例:

ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            if(future.isSuccess()){
                System.out.println("執行成功...");
            }else if(future.isCancelled()){
                System.out.println("任務被取消...");
            }else if(future.cause()!=null){
                System.out.println("執行出錯:"+future.cause().getMessage());
            }
        }
    }
});

5.2 ChannelFuture 父接口說明

ChannelFuture 的類繼承結構,具體如下所示:

public interface ChannelFuture extends Future<Void> {
    
}
public interface Future<V> extends java.util.concurrent.Future<V> {
    
}

通過上面的繼承關系,我們可以清晰的知道 ChannelFuture 其實最頂層的接口是來自 java 并發包的 Future,java 并發包下的 Future 需要手工檢查執行結果是否已經完成,非常的繁瑣,因此 Netty 把它進行了封裝和完善,變成了自動的監聽,用起來變的非常的簡單。

java 并發包下的 Future 主要存在以下幾個缺陷:

  1. 只允許手動通過 get () 來檢查對應的操作是否已經完成,它是堵塞直到子線程完成執行并且返回結果;
  2. 只有 isDone () 方法判斷一個異步操作是否完成,但是對于完成的定義過于模糊,JDK 文檔指出正常終止、拋出異常、用戶取消都會使 isDone () 方法返回真。并不能很好的區分到底是哪種狀態。

get () 方法是堵塞的,必須等待子線程執行完成才能往下執行。

實例:

//1.定義一個子線程,實現 Callable 接口
public class ThreadTest implements Callable<Integer>{
    @Override
    public Integer call(){
	    //打印
    	System.out.println(">>>>>>>>子線程休眠之前");
	    //休眠5秒
	    Thread.sleep(5000);
    	//打印
    	System.out.println(">>>>>>>>子線程休眠之后");
        return 1;
	}
}
//2.調用子線程處理
public static void main(String[] args){
    ThreadTest t=new ThreadTest();
    FutureTask<Integer> future=new FutureTask<Integer>(t);
    //2.1.開始執行子線程
    new Thread(future).start();
    
  	//2.2.手工返回結果
    int result=future.get();
    System.out.println(">>>>>>>>執行結果:"+result);
    //2.3.操作數據庫
    userDao.updateStatus("1");
}

執行結果:

>>>>>>>>子線程休眠之前
>>>>>>>>子線程休眠之后
>>>>>>>>執行結果:1

結論總結:

  1. 說明了 Java 并發包的 Future 要想獲取異步執行結果,必須手工調用 get () 方法,此時雖然能獲取執行結果,但是無法知道執行結果是成功還是失敗;
  2. 使用 get () 獲取執行結果,但是 get () 后面的業務則被堵塞,直到后面執行完畢才會往下執行,失去了異步操作提高執行效率的意義了。

6. ChannelFuture 原理

6.1 線程堵塞

思考:sync () 和 await () 方法如何同步等待執行完成并獲取執行結果的呢?

源碼分析如下所示:

private short waiters;//計數器

@Override
public Promise<V> await() throws InterruptedException {
    //1.判斷是否執行完成,如果執行完成則返回
    if (isDone()) {
        return this;
    }

    //2.線程是否已經中斷,如果中斷則拋異常
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    //3.檢查死鎖
    checkDeadLock();

    //4.同步代碼塊->while循環不斷的監聽執行結果
    synchronized (this) {
        while (!isDone()) {
            incWaiters();//waiters遞增
            try {
                wait();//JDK 的 Object 方法,線程等待【核心】
            } finally {
                decWaiters();//waiters 遞減
            }
        }
    }
    return this;
}

//遞增函數
private void incWaiters() {
    if (waiters == Short.MAX_VALUE) {
        throw new IllegalStateException("too many waiters: " + this);
    }
    ++waiters;
}

//遞減函數
private void decWaiters() {
    --waiters;
}

通過以上代碼,我們發現 await () 的核心其實就是調用 Object 的 wait () 方法進行線程休眠,普通的 Java 多線程知識點。

6.2 線程喚醒

思考:當前線程休眠了,那么什么時候進行喚醒呢?

源碼分析如下所示:

@Override
public Promise<V> setSuccess(V result) {
    //1.setSuccess0 賦值操作
    if (setSuccess0(result)) {
        //2.通知執行監聽器
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
    //繼續進入方法
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        
        //繼續進入方法
        checkNotifyWaiters();
        return true;
    }
    return false;
}

private synchronized void checkNotifyWaiters() {
    if (waiters > 0) {
        //核心:喚醒之前休眠的線程
        notifyAll();
    }
}

源碼分析總結:

  1. 堵塞的核心是通過 Object.wait () 方法進行休眠當前線程,普通的 Java 多線程知識;
  2. 執行完成之后給不同狀態(setSuccess、setFailure)賦值的時候喚醒休眠的線程;
  3. 喚醒線程之后調用監聽器的方法 l.operationComplete(future);

7. 小結

通過本節的學習,我們需要掌握以下幾個核心知識點:

  1. 掌握異步的概念,傳統 I/O 是同步堵塞的,執行 I/O 操作后線程會被阻塞住,直到操作完成;異步處理的好處是不會造成線程阻塞,可以通過 Future 來監聽異步執行的結果;
  2. ChannelFuture 的幾種狀態,以及它的值變化時機;
  3. ChannelFuture 的堵塞和喚醒源碼分析。