Zookeeper Jute
1. 前言
在我們使用 Zookeeper 客戶端和 Zookeeper 服務端建立連接,發送請求時,Zookeeper 客戶端需要把請求協議進行序列化才能進行發送,Zookeeper 服務端接收到請求,還需要把請求協議進行反序列化才能解析請求,這樣才完成了一次請求的發送。那么序列化是什么呢?我們為什么要使用序列化?Zookeeper 中又是如何使用序列化的呢?我們就帶著這些問題開始本節的內容。
2. Zookeeper 序列化
在 Zookeeper 的通信與會話一節中,我們學習了 Zookeeper 使用的是基于 TCP 的通信協議,TCP 協議是一種面向連接的、可靠的、基于字節流的通信協議。
我們想要使用 Zookeeper 客戶端向 Zookeeper 服務端發送請求,我們就需要把請求發送的 Java 對象轉換為字節流的形式,這個轉換的過程就是序列化。相對來說,把字節流轉換為 Java 對象的過程就是反序列化。
那么我們什么時候使用序列化呢?
- 對象需要持久化時;
- 對象進行網絡傳輸時。
Zookeeper 的應用場景就需要大量的網絡傳輸,所以需要使用序列化來提高 Zookeeper 客戶端與服務端之間的通信效率。
在 Zookeeper 中,這一過程不需要我們自己去實現,無論是 Zookeeper 客戶端還是 Zookeeper 服務端,都已經把序列化和反序列化的過程進行了封裝。那么 Zookeeper 時如何實現序列化的呢?接下來我們就來介紹 Zookeeper 的序列化實現方式 Jute。
2.1 Jute 介紹
Jute 前身是 Hadoop Record IO 中的序列化組件,從 Zookeeper 第一個正式版,到目前最新的穩定版本 Apache ZooKeeper 3.6.1 都是用的 Jute 作為序列化組件。
為什么 Zookeeper 會一直選擇 Jute 作為它的序列化組件呢?并不是 Jute 的性能比其他的序列化框架好,相反的,現在市面上有許多性能更好的序列化組件,比如 Apache Thrift,Apache Avro 等組件,性能都要優于 Jute。之所以還使用 Jute,是因為到目前為止,Jute 序列化還不是 Zookeeper 的性能瓶頸,沒有必要強行更換,而且很難避免因為替換基礎組件而帶來的一系列版本兼容的問題。
簡單的介紹了一下 Jute ,那么在 Zookeeper 中,Jute 又是如何實現的呢?接下來我們就來講解 Jute 在 Zookeeper 中的實現。
2.2 Jute 實現
在 Zookeeper 的通信及會話一節中,我們學習了 Zookeeper 的請求協議和響應協議,并查看了部分源碼。我們可以發現,無論是請求協議還是響應協議的具體類,都實現了接口 Record,Record 接口其實就是 Jute 定義的序列化接口。
package org.apache.jute;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience.Public;
@Public
public interface Record {
// 序列化
void serialize(OutputArchive var1, String var2) throws IOException;
// 反序列化
void deserialize(InputArchive var1, String var2) throws IOException;
}
我們這里使用請求頭 RequestHeader 作為例子來查看序列化和反序列化的實現:
// 請求頭,實現了 Record
public class RequestHeader implements Record {
private int xid;
private int type;
// 使用 OutputArchive 進行序列化,tag:序列化標識符
public void serialize(OutputArchive a_, String tag) throws IOException {
a_.startRecord(this, tag);
a_.writeInt(this.xid, "xid");
a_.writeInt(this.type, "type");
a_.endRecord(this, tag);
}
// 使用 InputArchive 進行反序列化,tag 序列化標識符
public void deserialize(InputArchive a_, String tag) throws IOException {
a_.startRecord(tag);
this.xid = a_.readInt("xid");
this.type = a_.readInt("type");
a_.endRecord(tag);
}
}
在 serialize 序列化方法中,根據成員變量的數據類型來選擇 OutputArchive 的方法來進行序列化操作。在 deserialize 反序列化方法中,根據成員變量的數據類型來選擇 InputArchive 的方法來進行反序列化操作。
在這里我們可以發現,Record 只是定義了序列化方法和反序列化方法,真正執行序列化操作是 OutputArchive 和 InputArchive 這兩個接口的實現類。接下來我們講解 OutputArchive 序列化接口的實現類 BinaryOutputArchive。
2.3 BinaryOutputArchive
BinaryOutputArchive 是二進制的序列化方式,這種方式將 Java 對象轉化成二進制的格式來進行數據傳輸。在它的具體方法中,使用的是 java.io.DataOutputStream 的方法來完成 Java 對象到二進制的轉換,我們來看看具體實現:
/**
* 二進制的序列化
*/
public class BinaryOutputArchive implements OutputArchive {
// 定義字節緩沖區大小
private ByteBuffer bb = ByteBuffer.allocate(1024);
// 數據輸出接口 DataOutput
private DataOutput out;
// 使用 OutputStream 初始化 BinaryOutputArchive
public static BinaryOutputArchive getArchive(OutputStream strm) {
return new BinaryOutputArchive(new DataOutputStream(strm));
}
// 構造方法傳入 DataOutput
public BinaryOutputArchive(DataOutput out) {
this.out = out;
}
// 輸出 byte 類型為二進制
public void writeByte(byte b, String tag) throws IOException {
this.out.writeByte(b);
}
// 輸出 boolean 類型為二進制
public void writeBool(boolean b, String tag) throws IOException {
this.out.writeBoolean(b);
}
// 輸出 int 類型為二進制
public void writeInt(int i, String tag) throws IOException {
this.out.writeInt(i);
}
// 輸出 long 類型為二進制
public void writeLong(long l, String tag) throws IOException {
this.out.writeLong(l);
}
// 輸出 float 類型為二進制
public void writeFloat(float f, String tag) throws IOException {
this.out.writeFloat(f);
}
// 輸出 double 類型為二進制
public void writeDouble(double d, String tag) throws IOException {
this.out.writeDouble(d);
}
// 工具方法:字符串轉字節緩沖區
private ByteBuffer stringToByteBuffer(CharSequence s) {
this.bb.clear();
int len = s.length();
for(int i = 0; i < len; ++i) {
if (this.bb.remaining() < 3) {
ByteBuffer n = ByteBuffer.allocate(this.bb.capacity() << 1);
this.bb.flip();
n.put(this.bb);
this.bb = n;
}
char c = s.charAt(i);
if (c < 128) {
this.bb.put((byte)c);
} else if (c < 2048) {
this.bb.put((byte)(192 | c >> 6));
this.bb.put((byte)(128 | c & 63));
} else {
this.bb.put((byte)(224 | c >> 12));
this.bb.put((byte)(128 | c >> 6 & 63));
this.bb.put((byte)(128 | c & 63));
}
}
this.bb.flip();
return this.bb;
}
// 輸出 String 類型為二進制
public void writeString(String s, String tag) throws IOException {
if (s == null) {
this.writeInt(-1, "len");
} else {
// String 不為空轉字節緩沖區
ByteBuffer bb = this.stringToByteBuffer(s);
this.writeInt(bb.remaining(), "len");
// 輸出 字節數組
this.out.write(bb.array(), bb.position(), bb.limit());
}
}
// 輸出 字節數組為二進制
public void writeBuffer(byte[] barr, String tag) throws IOException {
if (barr == null) {
this.out.writeInt(-1);
} else {
this.out.writeInt(barr.length);
this.out.write(barr);
}
}
}
介紹了序列化的具體實現類,接下來就是反序列化接口 InputArchive 的實現類 BinaryInputArchive。
2.4 BinaryInputArchive
BinaryInputArchive 是二進制的反序列化,也就是把二進制格式轉換成 Java 對象。在它的具體方法中,使用了 java.io.DataOutputStream 的方法來完成二進制格式到 Java 對象的轉換,我們來看具體實現:
package org.apache.jute;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
/**
* 二進制的反序列化
*/
public class BinaryInputArchive implements InputArchive {
// 長度異常的字符串
public static final String UNREASONBLE_LENGTH = "Unreasonable length = ";
// 最大緩沖區
public static final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);
// 額外的最大緩沖區
private static final int extraMaxBuffer;
static {
final Integer configuredExtraMaxBuffer =
Integer.getInteger("zookeeper.jute.maxbuffer.extrasize", maxBuffer);
if (configuredExtraMaxBuffer < 1024) {
extraMaxBuffer = 1024;
} else {
extraMaxBuffer = configuredExtraMaxBuffer;
}
}
// 數據輸入接口 DataInput
private DataInput in;
// 最大緩沖區的大小
private int maxBufferSize;
// 額外的最大緩沖區的大小
private int extraMaxBufferSize;
// 使用 InputStream 實例化 BinaryInputArchive
public static BinaryInputArchive getArchive(InputStream strm) {
return new BinaryInputArchive(new DataInputStream(strm));
}
private static class BinaryIndex implements Index {
private int nelems;
BinaryIndex(int nelems) {
this.nelems = nelems;
}
public boolean done() {
return (nelems <= 0);
}
public void incr() {
nelems--;
}
}
// 構造方法
public BinaryInputArchive(DataInput in) {
this(in, maxBuffer, extraMaxBuffer);
}
// 構造方法
public BinaryInputArchive(DataInput in, int maxBufferSize, int extraMaxBufferSize) {
this.in = in;
this.maxBufferSize = maxBufferSize;
this.extraMaxBufferSize = extraMaxBufferSize;
}
// 讀取二進制到 Byte 類型
public byte readByte(String tag) throws IOException {
return in.readByte();
}
// 讀取二進制到 Boolean 類型
public boolean readBool(String tag) throws IOException {
return in.readBoolean();
}
// 讀取二進制到 int 類型
public int readInt(String tag) throws IOException {
return in.readInt();
}
// 讀取二進制到 long 類型
public long readLong(String tag) throws IOException {
return in.readLong();
}
// 讀取二進制到 float 類型
public float readFloat(String tag) throws IOException {
return in.readFloat();
}
// 讀取二進制到 double 類型
public double readDouble(String tag) throws IOException {
return in.readDouble();
}
// 讀取二進制到 String 類型
public String readString(String tag) throws IOException {
int len = in.readInt();
if (len == -1) {
return null;
}
checkLength(len);
byte[] b = new byte[len];
in.readFully(b);
return new String(b, StandardCharsets.UTF_8);
}
// 讀取二進制到字節數組
public byte[] readBuffer(String tag) throws IOException {
int len = readInt(tag);
if (len == -1) {
return null;
}
checkLength(len);
byte[] arr = new byte[len];
in.readFully(arr);
return arr;
}
}
3. 總結
在本節內容中我們學習了什么是序列化,為什么要使用序列化來進行數據傳輸, 以及 Zookeeper 的序列化方式 Jute 的具體實現。以下是本節內容總結:以下是本節內容總結:
- 什么是序列化。
- 什么時候使用序列化。
- Zookeeper 的序列化方式 Jute。