Java AsynchronousSocketChannel 介紹
1. 前言
Java NIO 可以編寫高性能服務器,所依賴的 I/O 事件分發機制是 Selector。Selector 的工作原理就是有一個線程會調用 Selector 的 select 方法,然后進入阻塞狀態,等待事件的發生。一旦有 I/O 事件發生,阻塞在 select 方法上的線程會返回,然后進行事件分發。其本質還是一個同步實現。
本小節將要介紹 Java 7 中引入的完全異步的編程方法,核心組件是 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 兩個類,分別用來編寫服務器和客戶端程序。 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 是在 java.nio.channels 包中引入的。
2. 基于 Future 編寫服務器程序
創建一個 AsynchronousServerSocketChannel 服務器的步驟如下:
- 創建 AsynchronousServerSocketChannel 的實例,需要通過它提供的工廠方法 open 完成。如下:
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open()
- 將 AsynchronousServerSocketChannel 綁定在一個本地 IP 地址或者端口。
server.bind(new InetSocketAddress("127.0.0.1", PORT));
- 向 AsynchronousServerSocketChannel 投遞一個 accept 操作。accept 調用會立即返回,不會阻塞調用線程。accept 的返回值是一個 Future 對象。
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
- 通過 Future 對象的 get 方法獲取新的連接對象,返回值是 AsynchronousSocketChannel 類型的對象。注意,Future 對象的 get 方法會阻塞調用線程。get 方法接收一個 timeout 參數。
AsynchronousSocketChannel client = acceptFuture.get(10, TimeUnit.SECONDS);
- 調用 AsynchronousSocketChannel 的 read 方法,投遞一個 read 事件。注意:read 接收的參數是 ByteBuffer。read 是異步調用,不會阻塞線程。Future 的 get 調用會阻塞調用線程。
ByteBuffer inBuffer = ByteBuffer.allocate(128);
Future<Integer> readResult = client.read(inBuffer);
System.out.println("Do something");
readResult.get();
- 調用 AsynchronousSocketChannel 的 write 方法,投遞一個 write 事件。注意:write 接收的參數是 ByteBuffer。write 是異步調用,不會阻塞線程。Future 的 get 調用會阻塞調用線程。
Future<Integer> writeResult = client.write(inBuffer);
writeResult.get();
服務器完整代碼:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class AsyncServer {
private static final int PORT =56002;
public static void main(String[] args) {
try (AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open()){
server.bind(new InetSocketAddress("127.0.0.1", PORT));
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
AsynchronousSocketChannel client = acceptFuture.get(10, TimeUnit.SECONDS);
if (client != null && client.isOpen()){
ByteBuffer inBuffer = ByteBuffer.allocate(128);
Future<Integer> readResult = client.read(inBuffer);
System.out.println("Do something");
readResult.get();
inBuffer.flip();
Future<Integer> writeResult = client.write(inBuffer);
writeResult.get();
}
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 基于 Future 編寫客戶端程序
編寫客戶端程序,首先是創建 AsynchronousSocketChannel 實例,通過它的 open 方法完成。然后調用 AsynchronousSocketChannel 的 connect 方法連接服務器,同樣是異步調用,不會阻塞調用線程。調用 Future 的 get 方法獲取連接結果。剩下客戶端數據收發邏輯和服務器的數據收發邏輯一致。
客戶端完整代碼:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;
public class AsyncClient {
private static final int PORT =56002;
public static void main(String[] args) {
try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open()) {
Future<Void> result = client.connect(new InetSocketAddress("127.0.0.1", PORT));
System.out.println("Async connect the server");
result.get();
String reqMessage = "Hello server!";
ByteBuffer reqBuffer = ByteBuffer.wrap(reqMessage.getBytes());
Future<Integer> writeResult = client.write(reqBuffer);
System.out.println("Async send to server:" + reqMessage);
writeResult.get();
ByteBuffer inBuffer = ByteBuffer.allocate(128);
Future<Integer> readResult = client.read(inBuffer);
readResult.get();
System.out.println("Async recv from server:" + new String(inBuffer.array()).trim());
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. 異步 I/O 操作說明
異步 Socket 編程的一個關鍵點是:AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 提供的一組 I/O 操作是異步的,方法調用完后會立即返回,而不會關心操作是否完成,并不會阻塞調用線程。如果要想獲取 I/O 操作的結果,可以通過 Future 的方式,或者是 CompletionHandler 的方式。
下面列舉的 connect、accept、read、write 四組 I/O 方法,返回值是 Future 對象的 I/O 方法,前面已經介紹。還有就是需要傳入一個 attachment 參數和一個 CompletionHandler 參數,這是基于完成例程的方式。
- connect 異步操作
public abstract Future<Void> connect(SocketAddress remote);
public abstract <A> void connect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler);
- accept 異步操作
public abstract Future<AsynchronousSocketChannel> accept();
public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);
- read 異步操作
public abstract Future<Integer> read(ByteBuffer dst);
public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
- write 異步操作
public abstract Future<Integer> write(ByteBuffer src);
public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
通過 Future 實現異步客戶端、服務器程序,盡管 I/O 相關方法調用是異步的,但是還得通過 Future 的 get 方法獲取操作的結果,而 Future 的 get 調用是同步的,所以還是沒有做到完全異步。而通過 CompletionHandler 獲取 I/O 結果,所有 I/O 操作的執行結果都會通過 CompletionHandler 回調返回。
5. 基于 CompletionHandler 編寫服務器
基于 CompletionHandler 編寫服務器,關鍵是兩步:
- 需要給每一個 I/O 操作傳入一個 attachment 參數,這是用來記錄用戶上下文信息的。在示例代碼中,我們抽象了一個類表示上下文信息。
private static class AsyncIOOP {
private int op_type;
private ByteBuffer read_buffer;
private AsynchronousSocketChannel client;
}
- 還需要傳入一個 CompletionHandler 參數,這需要你自定義一個類并且實現 CompletionHandler 接口。由于 accept 操作和其他三個操作不同,所以我們定義了兩個實現 CompletionHandler 接口的類。
private static class AsyncIOOPCompletionHandler implements CompletionHandler<Integer, AsyncIOOP>
{
}
private static class AsyncAcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, syncIOOP>
{
}
每一個 I/O 操作完成,系統都會回調 CompletionHandler 的 completed 方法,你需要覆蓋此方法,然后處理返回結果。
示例代碼實現的是一個 Echo 邏輯,關鍵步驟如下:
- 服務器啟動的時候,投遞一個 accept 操作。
- 當收到 accept 操作完成,首先投遞一個 accept 操作,準備接收新客戶端請求;然后為剛接收的客戶端投遞一個 read 操作,準備接收數據。
- 當收到 read 操作完成,向客戶端投遞一個 write 操作,發送響應給客戶端;然后再次投遞一個 read 操作,準備接收新的消息。
- 當收到 write 操作完成,我們沒有處理邏輯,因為這是一個簡單的 Echo 功能。
完整代碼如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AsyncServerCompletionHandler {
private static final int PORT =56002;
private AsynchronousServerSocketChannel server = null;
private static final int ASYNC_READ = 1;
private static final int ASYNC_WRITE = 2;
private static final int ASYNC_ACCEPT = 3;
private static final int ASYNC_CONNECT = 4;
private static class AsyncIOOP {
private int op_type;
private ByteBuffer read_buffer;
private AsynchronousSocketChannel client;
public int getOp_type() {
return op_type;
}
public void setOp_type(int op_type) {
this.op_type = op_type;
}
public ByteBuffer getRead_buffer() {
return read_buffer;
}
public void setRead_buffer(ByteBuffer read_buffer) {
this.read_buffer = read_buffer;
}
public AsynchronousSocketChannel getClient() {
return client;
}
public void setClient(AsynchronousSocketChannel client) {
this.client = client;
}
public AsyncIOOP(int op) {
this(op, null, null);
}
public AsyncIOOP(int op, ByteBuffer b) {
this(op, b, null);
}
public AsyncIOOP(int op, ByteBuffer b, AsynchronousSocketChannel ch) {
this.op_type = op;
this.read_buffer = b;
this.client = ch;
}
}
private static class AsyncIOOPCompletionHandler implements CompletionHandler<Integer, AsyncIOOP>
{
private AsyncServerCompletionHandler server;
public AsyncIOOPCompletionHandler(AsyncServerCompletionHandler server){
this.server = server;
}
@Override
public void completed(Integer result, AsyncIOOP attachment) {
if (attachment.op_type == ASYNC_READ) {
server.async_write(attachment.getClient(), "Hello Client!");
ByteBuffer inBuffer = attachment.getRead_buffer();
System.out.println("Recv message from client:" + new String(inBuffer.array()).trim());
server.async_read(attachment.getClient());
} else if (attachment.op_type == ASYNC_WRITE) {
}
}
@Override
public void failed(Throwable exc, AsyncIOOP attachment) {
try {
attachment.getClient().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static class AsyncAcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncIOOP>
{
private AsyncServerCompletionHandler server;
public AsyncAcceptCompletionHandler(AsyncServerCompletionHandler server) {
this.server = server;
}
@Override
public void completed(AsynchronousSocketChannel result, AsyncIOOP attachment) {
if (attachment.op_type == ASYNC_ACCEPT) {
server.accept_new_client();
if (result != null && result.isOpen()) {
server.async_read(result);
}
}
}
@Override
public void failed(Throwable exc, AsyncIOOP attachment) {
}
}
public void start() {
try {
server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("127.0.0.1", PORT));
accept_new_client();
} catch (Exception e) {
e.printStackTrace();
stop();
}
}
public void accept_new_client() {
server.accept(new AsyncIOOP(ASYNC_ACCEPT), new AsyncAcceptCompletionHandler(this));
}
public void async_read(AsynchronousSocketChannel client){
ByteBuffer inBuffer = ByteBuffer.allocate(128);
AsyncIOOP ioop = new AsyncIOOP(ASYNC_READ, inBuffer, client);
client.read(inBuffer, ioop, new AsyncIOOPCompletionHandler(this));
}
public void async_write(AsynchronousSocketChannel client, String message){
ByteBuffer outBuffer = ByteBuffer.wrap(message.getBytes());
AsyncIOOP ioop = new AsyncIOOP(ASYNC_WRITE, outBuffer, client);
client.write(outBuffer, ioop, new AsyncIOOPCompletionHandler(this));
}
public void stop(){
if (server != null){
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
AsyncServerCompletionHandler server = new AsyncServerCompletionHandler();
server.start();
try {
Thread.sleep(1000*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
6. 總結
本小節重點是介紹 Java NIO2 中引入的異步 Socket 的功能。異步 Socket 的核心是每一個 I/O 方法(connect、accept、read、write)的調用只是向系統投遞一個事件,方法執行完會立即返回。如果要獲取 I/O 執行的結果,可以通過 Future 或者 CompletionHandler 獲取。Java 的這個機制非常類似 Windows IOCP(完成端口)的功能,如果有興趣可以參考[慕課網專欄][1] IOCP 一節,或者 [IOCP 相關實現代碼][2]。