亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

Zookeeper Watch

1. 前言

在我們使用 Zookeeper 來實現服務注冊與發現、配置中心、分布式通知等功能時,需要使用 Zookeeper 的核心功能 Watch,來對節點進行監聽。那么 Zookeeper 的 Watch 是如何實現對節點的監聽,并響應事件到客戶端的呢?我們就帶著這個問題開始本節的內容。

2. Watch 的實現

我們在 Zookeeper 的數據模型這一小節中學習過 Znode 節點的類型和 Znode 節點的特點,這是 Zookeeper 核心特性之一。在大多數情況下,我們都會把 Znode 與 Watch 捆綁使用,接下來我們就使用 Zookeeper 的 Java 客戶端 Curator 來實現 Watch 對 Znode 節點的監聽。
我們可以繼續使用我們在 Zookeeper Curator 這一節中創建的 Spring Boot 測試項目,在測試方法中對 Watch 進行實現。

2.1 CuratorWatcher

在我們使用 Curator 的 Fluent 風格進行鏈式調用時,我們可以使用 usingWatcher 來注冊 CuratorWatcher 來對我們的節點變化事件進行監聽:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 開啟會話
    curatorClient.start();
    // CuratorWatcher 為接口,我們需要實現 process 方法
    CuratorWatcher watcher = new CuratorWatcher(){
        @Override
        // 監聽事件處理
        public void process(WatchedEvent watchedEvent) {
            // 輸出 監聽事件
            System.out.println(watchedEvent.toString());
        }
    };
    // 在命名空間下創建持久節點 mooc,內容為 Wiki
    curatorClient.create().forPath("/mooc","Wiki".getBytes());
    // 獲取 mooc 節點的 data 數據,并對 mooc 節點開啟監聽
    byte[] bytes = curatorClient.getData().usingWatcher(watcher).forPath("/mooc");
    // 輸出 data
    System.out.println(new String(bytes));
    // 第一次更新
    curatorClient.setData().forPath("/mooc", "Wiki001".getBytes());
    // 第二次更新
    curatorClient.setData().forPath("/mooc","Wiki002".getBytes());
}

控制臺輸出:

Wiki
WatchedEvent state:SyncConnected type:NodeDataChanged path:/mooc

控制臺輸出的第一行 Wiki 為 mooc 節點的 data 數據。第二行輸出的 WatchedEvent 為監聽到的事件,state 表示監聽狀態;type 表示監聽到的事件類型,我們可以判斷事件的類型來做相應的處理;path 表示監聽的節點。
介紹完 WatchedEvent,我們發現控制臺只輸出了一次 WatchedEvent,也就是說 CuratorWatcher 只進行了一次監聽。如果想要重復使用我們需要重新使用 usingWatcher 進行注冊。那么有沒有不需要重復注冊的監聽呢?接下來我們就來介紹 Curator 一種功能強大的監聽 CuratorCacheListener。

2.2 CuratorCacheListener

CuratorCacheListener 是基于 CuratorCache 緩存實現的監聽器,CuratorCache 對 ZooKeeper 事件監聽進行了封裝,能夠自動處理反復注冊監聽。我們使用 CuratorCacheListener 時,需要使用構建器 CuratorCacheListenerBuilder 來對具體的事件監聽進行構建,并且把 CuratorCacheListener 注冊到 CuratorCache 緩存中。
首先我們需要構建 CuratorCache 緩存實例,在 CuratorCache 接口中,build 為靜態方法,我們可以直接調用:

// 構建 CuratorCache 緩存實例
static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
    return builder(client, path).withOptions(options).build();
}

我們來說明以下入參:CuratorFramework client 是 Curator 客戶端;String path 是需要被監聽的節點的路徑;CuratorCache.Options... options 是對緩存設置的參數,我們可以設置以下 3 種:

public static enum Options {
    // 單節點緩存
    SINGLE_NODE_CACHE,
    // 對數據進行壓縮
    COMPRESSED_DATA,
    // CuratorCache 關閉后不清除緩存
    DO_NOT_CLEAR_ON_CLOSE;
}

構建完緩存實例,我們再來構建 CuratorCacheListener ,在 CuratorCacheListener 接口中的構建方法 builder 為靜態方法,我們可以直接調用:

// builder 方法,返回 CuratorCacheListenerBuilder 構建器,我們就可以使用具體的監聽方法了
static CuratorCacheListenerBuilder builder() {
    return new CuratorCacheListenerBuilderImpl();
}

最后我們需要把 CuratorCacheListener 注冊到 CuratorCache 中,并開啟緩存:

// 注冊 CuratorCacheListener 
cache.listenable().addListener(listener);
// 開啟緩存
cache.start();

我們來看一個完整的例子:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework client = curatorService.getCuratorClient();
    // 開啟會話
    client.start();
    // 構建 CuratorCache 實例
    CuratorCache cache = CuratorCache.build(client, "/mooc");
	// 使用 Fluent 風格和 lambda 表達式來構建 CuratorCacheListener 的事件監聽
    CuratorCacheListener listener = CuratorCacheListener.builder()
        // 開啟對所有事件的監聽
        // type 事件類型:NODE_CREATED, NODE_CHANGED, NODE_DELETED;
        // oldNode 原節點:ChildData 類,包括節點路徑,節點狀態 Stat,節點 data
        // newNode 新節點:同上
       .forAll((type, oldNode, newNode) -> {
           System.out.println("forAll 事件類型:" + type);
           System.out.println("forAll 原節點:" + oldNode);
           System.out.println("forAll 新節點:" + newNode);
       })
        // 開啟對節點創建事件的監聽
        .forCreates(childData -> {
            System.out.println("forCreates 新節點:" + childData);
        })
        // 開啟對節點更新事件的監聽
        .forChanges((oldNode, newNode) -> {
            System.out.println("forChanges 原節點:" + oldNode);
            System.out.println("forChanges 新節點:" + newNode);
        })
        // 開啟對節點刪除事件的監聽
        .forDeletes(oldNode -> {
            System.out.println("forDeletes 原節點:" + oldNode);
        })
        // 初始化
        .forInitialized(() -> {
            System.out.println("forInitialized 初始化");
        })
        // 構建
        .build();

    // 注冊 CuratorCacheListener 到 CuratorCache
    cache.listenable().addListener(listener);
    // CuratorCache 開啟緩存
    cache.start();
    // mooc 節點創建
    client.create().forPath("/mooc");
    // mooc 節點更新
    client.setData().forPath("/mooc","Wiki".getBytes());
    // mooc 節點刪除
    client.delete().forPath("/mooc");
}

我們來查看 CuratorCacheListenerBuilder 接口中具體的事件監聽,我們需要監聽哪種事件就使用哪種方法:

public interface CuratorCacheListenerBuilder {
    // 全部事件
    CuratorCacheListenerBuilder forAll(CuratorCacheListener var1);
	// 創建事件
    CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1);
	// 更新事件
    CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1);
	// 創建和更新事件
    CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1);
	// 刪除事件
    CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1);
	// 初始化后開啟線程異步執行
    CuratorCacheListenerBuilder forInitialized(Runnable var1);
	// 子節點的事件
    CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3);
	// 節點本身的事件和子節點的事件
    CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2);
	// 節點本身的事件
    CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1);
 	// 初始化后開啟監聽
    CuratorCacheListenerBuilder afterInitialized();
	// 構建方法
    CuratorCacheListener build();
	/*
	* 更新事件時被調用
	*/
    @FunctionalInterface
    public interface ChangeListener {
        void event(ChildData var1, ChildData var2);
    }
}

接下來我們執行測試方法,查看控制臺輸出:

forInitialized 初始化
forAll 事件類型:NODE_CREATED
forAll 原節點:null
forAll 新節點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forCreates 新節點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 事件類型:NODE_CHANGED
forAll 原節點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 新節點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forChanges 原節點:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forChanges 新節點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 事件類型:NODE_DELETED
forAll 原節點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 新節點:null
forDeletes 原節點:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}

我們發現,我們設置的 create,setData,delete 這 3 種事件都被監聽到了,而且 forAll 每一種事件都監聽到了,所以我們在使用的時候,選擇我們需要的事件監聽即可。
介紹完 CuratorCacheListener 監聽器,并完成了事件監聽的測試,那么 Zookeeper 的 Watch 是如何運行的呢?接下來我們就來介紹 Watch 的運行原理。

3. Watch 的原理

在介紹 Watch 的原理之前,我們先熟悉一個概念:Zookeeper 客戶端對 Znode 的寫操作,也就是新增節點、更新節點、刪除節點這些操作,默認會開啟監聽;Zookeeper 客戶端對 Znode 的讀操作,也就是查詢節點數據、查詢節點是否存在、查詢子節點等操作,需要手動設置開啟監聽。這也是為什么在 GetDataRequest 請求體中會有 watch 這個屬性的原因。
Watch 的運行過程分為 4 部分,分別是:客戶端注冊 Watch 、服務端注冊 Watch、服務端觸發 Watch、客戶端處理回調。
Watch 的運行過程

  • 客戶端注冊 Watch
    當我們使用 Zookeeper 客戶端向 Zookeeper 服務端發送帶有事件監聽的請求時,Zookeeper 客戶端會把該請求標記成帶有 Watch 的請求,然后把 Watch 監聽器注冊到 ListenerManager 中。

  • 服務端注冊 Watch
    Zookeeper 服務端接收到 Zookeeper 客戶端發送過來的請求,解析請求體,判斷該請求是否帶有 Watch 事件,如果有 Watch 事件,就會把 Watch 事件注冊到 WatchManager 中。

  • 服務端觸發 Watch
    Zookeeper 服務端注冊完 Watch 事件后,會調用 WatchManager 的 triggerWatch 方法來觸發 Watch 事件,Watch 事件完成后,向客戶端發送響應。

  • 客戶端處理回調
    Zookeeper 客戶端接收到 Zookeeper 服務端的響應后,解析響應體,根據響應體的類型去 ListenerManager 中查找相對應的 Watch 監聽器,然后觸發監聽器的回調函數。

4. 總結

在本節中,我們學習了使用 Curator 的兩種方式開啟對事件的監聽,也了解了 Watch 運行過程的 4 個部分。以下是本節內容的總結:

  1. Zookeeper Watch 的實現。
  2. Zookeeper Watch 的原理。