亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

Zookeeper Curator

1. 前言

在上一節中我們學習了 Zookeeper 的 Java 客戶端之一 ZkClient ,除了 ZkClient 之外,還有就是 Zookeeper 最流行的 Java 客戶端之一的 Curator。Curator 與 ZkClient 相比較又有什么區別呢?接下來我們就開始 Curator 的學習。

2. Curator 簡介

我們來看一下 Curator 的官網介紹:
Curator 的官網介紹
Curator 是 Netflix 公司開源的一套 Zookeeper 客戶端框架,后來捐獻給 Apache 成為頂級的開源項目。
Curator 和 ZkClient 同樣簡化了 Zookeeper 原生 API 的開發工作,而 Curator 提供了一套易用性和可讀性更強的 Fluent 風格的客戶端 API ,還提供了 Zookeeper 各種應用場景的抽象封裝,比如:分布式鎖服務、集群領導選舉、共享計數器、緩存機制、分布式隊列等。
Curator 相較其它 Zookeeper 客戶端功能更強大,應用更廣泛,使用更便捷,所以它能成為當下最流行的 Zookeeper 的 Java 客戶端之一。
接下來我們就開始學習如何使用 Curator 客戶端對 Zookeeper 服務進行操作。

Tips: Fluent 風格類似于鏈式編程,使用 Fluent 風格編寫的類,調用該類的方法會返回該類本身,然后可以繼續調用該類方法。

3. Curator 使用

我們新建一個 Spring Boot 項目來對 Curator 進行集成。首先我們要在 pom.xml 文件中加入 Curator 的 Maven 依賴。

3.1 Curator 依賴

pom.xml 文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.cdd</groupId>
    <artifactId>curator-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>curator-demo</name>
    <description>curator-demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

引入 Curator 的依賴后,我們先來介紹一下 Curator 的 API ,然后再編寫測試用例進行 API 測試。

3.2 Curator API

本小節我們來對 Curator 的 API 進行介紹,主要有 Curator 客戶端實例的創建,session 的重連策略,節點的添加,獲取節點數據,修改節點的數據,刪除節點等。

3.2.1 創建客戶端

我們這里講解 3 種創建客戶端的方法,Curator 客戶端的實現類為 CuratorFrameworkImpl,我們可以用它的接口 CuratorFramework 來接收創建客戶端的返回值 。

  • 第 1 種: 使用創建 Curator 客戶端的 API newClient 方法,其中第一個參數 connectString 為 Zookeeper 服務端的地址字符串,第二個參數 RetryPolicy 為會話重連策略,關于重連策略我們稍后再進行詳細的講解。
    public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
        return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
    }
    
  • 第 2 種: 在上面的 newClient 方法中,其實還是調用的下面的 newClient 方法,增加了參數 sessionTimeoutMs 會話超時時間,connectionTimeoutMs 連接超時時間。
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
        return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
    }
    
  • 第 3 種: 我們可以直接調用工廠類 CuratorFrameworkFactory 的 builder 方法,并且使用 Fluent 風格的寫法來完成客戶端的實例化。寫法如下:
    /**
    * 獲取 CuratorClient
    * 使用 Fluent 風格
    * @return CuratorFramework
    */
    public CuratorFramework getCuratorClient(){
        // 使用 CuratorFrameworkFactory 來構建 CuratorFramework
        CuratorFramework client = CuratorFrameworkFactory.builder()
            // Zookeeper 服務器地址字符串
            .connectString(connectString)
            // session 會話超時時間
            .sessionTimeoutMs(sessionTimeoutMs)
            // 使用哪種重連策略
            .retryPolicy(retryOneTime)
            // 命名空間,表示當前客戶端的父節點,我們可以用它來做業務區分
            .namespace(namespace)
            .build();
        return client;
    }
    

Curator 客戶端創建完畢后,我們使用 start 方法就可以創建會話,使用 close 方法結束會話。

client.start();
client.close();

3.2.2 會話重連策略

Curator 提供了會話重連策略的接口 RetryPolicy,并且提供了幾種默認的實現,下面我們介紹幾種常用的策略。

  1. RetryForever
    // RetryForever:間隔{參數1}毫秒后重連,永遠重試
    private RetryPolicy retryForever = new RetryForever(3000);
    
  2. RetryOneTime
    // RetryOneTime:{參數1}毫秒后重連,只重連一次
    private RetryPolicy retryOneTime = new RetryOneTime(3000);
    
  3. RetryNTimes
    // RetryNTimes: {參數2}毫秒后重連,重連{參數1}次
    private RetryPolicy retryNTimes = new RetryNTimes(3,3000);
    
  4. RetryUntilElapsed
    // RetryUntilElapsed:每{參數2}毫秒重連一次,總等待時間超過{參數1}毫秒后停止重連
    private RetryPolicy retryUntilElapsed = new RetryUntilElapsed(10000,3000);
    
  5. ExponentialBackoffRetry
    // ExponentialBackoffRetry:可重連{參數2}次,并增加每次重連之間的睡眠時間,遞增加公式如下:
    // {參數1} * Math.max(1,random.nextInt(1 << ({參數2} + 1)))
    private RetryPolicy exponential = new ExponentialBackoffRetry(1000,3);
    

Curator 的會話重連策略方案介紹完畢,我們選擇其中一種實現即可。

3.2.3 創建節點

創建好客戶端實例,開啟會話之后,我們就可以開始創建節點了,我們使用 create 方法來創建節點,Fluent 風格的方式可以讓我們自由組合創建方式。

// 節點路徑前必須加上/
String path = "/imooc";
// forPath 指定路徑創建節點,內容默認為客戶端ip。默認為持久節點。
client.create().forPath(path);
// 創建 imooc 節點,內容為 Wiki,內容參數需要字節數組。
client.create().forPath(path,"Wiki".getBytes());
// 創建節點時,同時創建它的父節點。withMode 聲明節點是什么類型的,可以使用枚舉類型CreateMode來確定。
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPa
th(path);

3.2.4 獲取節點數據

獲取節點數據我們使用 getData 方法,同時我們還可以使用 Stat 來獲取節點的最新狀態信息。

// 普通查詢
client.getData().forPath(path);
// 包含狀態的查詢
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);

3.2.5 更新節點數據

更新節點數據我們使用 setData 方法,我們可以指定 version 來更新對應版本的數據。如果 version 已過期,則拋出 BadVersionException 異常,表示更新節點數據失敗。

// 普通更新
client.setData().forPath(path,"wiki".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);

3.2.4 刪除節點

刪除節點我們使用 delete 的方法,我們可以對節點進行遞歸刪除,我們也可以指定 version 進行刪除,我們還可以強制刪除一個節點,只要當前客戶端的會話有效,客戶端在后臺就會持續進行刪除操作,直到刪除成功。

// 普通刪除
client.delete().forPath(path);
// 遞歸刪除子節點
client.delete().deletingChildrenIfNeeded().forPath(path);
// 指定版本刪除
client.delete().withVersion(1).forPath(path);
// 強制刪除
client.delete().guaranteed().forPath(path);

這里的 version 過期也會拋出 BadVersionException 異常,表示刪除失敗。
Curator 的 API 介紹完畢,我們接下來進行 API 測試。

3.3 API 測試

我們在 Spring Boot 主函數的同級新建 service 目錄,在 service 目錄中新建 CuratorService 類來獲取客戶端實例:

package cn.cdd.curatordemo.service;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.retry.RetryUntilElapsed;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CuratorService {
    // Zookeeper 服務器地址
    @Value("${curator.connectString}")
    private String connectString;
    // session 會話超時時間
    @Value("${curator.sessionTimeoutMs}")
    private int sessionTimeoutMs;
    // 名稱空間:在操作節點時,會以 namespace 為父節點
    @Value("${curator.namespace}")
    private String namespace;

    /**
     * session 重連策略,使用其中一種即可
     */
    // RetryForever:間隔{參數1}毫秒后重連,永遠重試
    private RetryPolicy retryForever = new RetryForever(3000);
    
    // RetryOneTime:{參數1}毫秒后重連,只重連一次
    private RetryPolicy retryOneTime = new RetryOneTime(3000);
    
    // RetryNTimes: {參數2}毫秒后重連,重連{參數1}次
    private RetryPolicy retryNTimes = new RetryNTimes(3,3000);
    
    // RetryUntilElapsed:每{參數2}毫秒重連一次,總等待時間超過{參數1}毫秒后停止重連
    private RetryPolicy retryUntilElapsed = new RetryUntilElapsed(10000,3000);
    
    // ExponentialBackoffRetry:可重連{參數2}次,并增加每次重連之間的睡眠時間,增加公式如下:
    // {參數1} * Math.max(1,random.nextInt(1 << ({參數2:maxRetries} + 1)))
    private RetryPolicy exponential = new ExponentialBackoffRetry(1000,3);


    /**
     * 獲取 CuratorClient
     * 使用 Fluent 風格
     * @return CuratorFramework
     */
    public CuratorFramework getCuratorClient(){
         // 使用 CuratorFrameworkFactory 來構建 CuratorFramework
         return CuratorFrameworkFactory.builder()
                 // Zookeeper 服務器地址字符串
                 .connectString(connectString)
                 // session 會話超時時間
                 .sessionTimeoutMs(sessionTimeoutMs)
                 // 使用哪種重連策略
                 .retryPolicy(retryOneTime)
                 // 配置父節點
                 .namespace(namespace)
                 .build();
    }
}

在 application.properties 配置文件中添加配置:

# Zookeeper 地址
curator.connectString=192.168.0.77:2181,192.168.0.88:2181,192.168.0.88:2181
# 會話超時時間
curator.sessionTimeoutMs=5000
# 命名空間,當前客戶端的父節點
curator.namespace=imooc

配置完成后,在 CuratorDemoApplicationTests 測試類中編寫測試用例。
首先我們來測試節點的創建:

package cn.cdd.curatordemo;

import cn.cdd.curatordemo.service.CuratorService;
import org.apache.curator.framework.CuratorFramework;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class CuratorDemoApplicationTests {
    // 注入 CuratorService 依賴
    @Autowired
    private CuratorService curatorService;

    @Test
    void contextLoads() throws Exception {
        // 獲取客戶端
        CuratorFramework curatorClient = curatorService.getCuratorClient();
        // 開啟會話
        curatorClient.start();
        // 在 namespace 下創建節點 Mooc , 節點前需要加 “/” 表示命名空間下的子節點
        // 節點內容為 Wiki ,使用字節數組傳入
        String mooc = curatorClient.create().forPath("/Mooc", "Wiki".getBytes());
        // 返回 /Mooc
        System.out.println(mooc);
        curatorClient.close();
    }
}

控制臺輸出當前創建的節點:

/Mooc 

創建完成后我們來查詢命名空間下的子節點:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 開啟會話
    curatorClient.start();
    // 查詢命名空間下的子節點
    List<String> strings = curatorClient.getChildren().forPath("/");
    System.out.println(strings);
    curatorClient.close();
}

控制臺輸出命名空間的子節點列表:

[Mooc]

Tips: 在我們創建客戶端使用了命名空間時,API 中可用 / 表示命名空間,也表示當前客戶端的根節點。

獲取節點數據測試:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 開啟會話
    curatorClient.start();
    // 獲取 Mooc 節點的內容
    byte[] bytes = curatorClient.getData().forPath("/Mooc");
    // 輸出
    System.out.println(new String(bytes));
    curatorClient.close();
}

控制臺輸出當前節點的內容:

Wiki

更新節點數據測試:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 開啟會話
    curatorClient.start();
	// 更新節點數據,返回當前節點狀態
    Stat stat = curatorClient.setData().forPath("/Mooc", "wiki".getBytes());
    // 輸出
    System.out.println(stat);
    curatorClient.close();
}

控制臺輸出表示當前節點狀態的數字:

4294967345,4294967352,1597805299226,1597850397723,1,0,0,0,4,0,4294967345

上面這串數字表示當前節點的狀態 Stat,我們可以查看 Stat 類來找到對應的信息:

public class Stat implements Record {
    // 創建節點時的事務 id
    private long czxid;
    // 修改節點時的事務 id
    private long mzxid;
    // 節點創建時的毫秒值
    private long ctime;
    // 節點修改時的毫秒值
    private long mtime;
    // 節點數據修改的次數
    private int version;
    // 子節點修改的次數
    private int cversion;
    // ACL修改的次數
    private int aversion;
    // 如果是臨時節點,該值為節點的 SessionId,其它類型的節點則為 0
    private long ephemeralOwner;
    // 數據長度
    private int dataLength;
    // 子節點數量
    private int numChildren;
    // 添加和刪除子節點的事務 id
    private long pzxid;
}

刪除節點數據測試:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 開啟會話
    curatorClient.start();
	// 刪除節點
    curatorClient.delete().forPath("/Mooc");
    curatorClient.close();
}

執行完成后,我們再次查詢命名空間下的子節點:

@Test
void contextLoads() throws Exception {
    // 獲取客戶端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 開啟會話
    curatorClient.start();
    // 查詢命名空間下的子節點
    List<String> strings = curatorClient.getChildren().forPath("/");
    System.out.println(strings);
    curatorClient.close();
}

控制臺輸出為空,表示刪除成功

[]

Tips: 使用 API 時,我們需要注意是否配置 namespace ,如果沒有配置 namespace 的話,我們使用 API 進行操作時,path 參數需要填寫全路徑。如果配置了 namespace ,我們使用 API 時,Curator 會自動幫我們在 path 前加上 namespace 。

4. 總結

本節我們學習了 Curator 是什么,Curator 可以是實現什么功能,我們還介紹了 Curator 常用的 API,并做了相應的測試。以下是本節內容的總結:

  1. 為什么要學習使用 Curator 客戶端。
  2. Curator 常用的 API。
  3. 使用 Spring Boot 集成 Curator。