首先看一段代码
nettyClient = new NettyClient(url);
nettyClient.open();
RpcContext.getContext().putAttribute(MotanConstants.ASYNC_SUFFIX, true);
Response response;
try {
response = nettyClient.request(request);
Assert.assertTrue(response instanceof ResponseFuture);
Object result = response.getValue();//获取返回结果会阻塞
RpcContext.destroy();
Assert.assertNotNull(result);
Assert.assertEquals("method: " + request.getMethodName() + " requestId: " + request.getRequestId(), result);
} catch (MotanServiceException e) {
e.printStackTrace();
assertTrue(false);
} catch (Exception e) {
e.printStackTrace();
assertTrue(false);
}主要是建立一个客户端的连接,发送请求,然后获取返回。
nettyClient.open();
open的时候会去创建一个channel的连接池,可以点进去看
所以当调用request的时候最终会走到这样一个方法
private Response request(Request request, boolean async) throws TransportException {
Channel channel;
Response response;
try {
channel = getChannel();//从连接池获取channel
if (channel == null) {
LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request));
return null;
}
// async request
response = channel.request(request);
} catch (Exception e) {
}
// aysnc or sync result
response = asyncResponse(response, async);
return response;
}
最终发起调用是在com.weibo.api.motan.transport.netty4.NettyChannel中的request方法
@Override
public Response request(Request request) throws TransportException {
int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0) {
throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
ResponseFuture response = new DefaultResponseFuture(request, timeout, this.nettyClient.getUrl());
this.nettyClient.registerCallback(request.getRequestId(), response);
byte[] msg = CodecUtil.encodeObjectToBytes(this, codec, request);
ChannelFuture writeFuture = this.channel.writeAndFlush(msg);
boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
if (result && writeFuture.isSuccess()) {
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("完全结束");
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount();
} else {
// 失败的调用
nettyClient.incrErrorCount();
}
}
});
return response;
}
writeFuture.cancel(true);
response = this.nettyClient.removeCallback(request.getRequestId());
if (response != null) {
response.cancel();
}
// 失败的调用
nettyClient.incrErrorCount();
if (writeFuture.cause() != null) {
throw new MotanServiceException("NettyChannel send request to server Error: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request), writeFuture.cause());
} else {
throw new MotanServiceException("NettyChannel send request to server Timeout: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request));
}
}
可以看到首先会获取一个请求超时时间,获取服务端返回的时候会用到
然后构建一个DefaultResponseFuture,传入超时时间
ResponseFuture response = new DefaultResponseFuture(request, timeout, this.nettyClient.getUrl());
然后注册这个DefaultResponseFuture,客户端的handler会接收到同一个DefaultResponseFuture
接着发送消息给服务端
ChannelFuture writeFuture = this.channel.writeAndFlush(msg);
最后会定义一个监听器,监听整个通信是否完成。当服务端返回数据后会调用处理返回的handler
com.weibo.api.motan.transport.netty4.NettyClient中的handler
new MessageHandler() {
@Override
public Object handle(Channel channel, Object message) {
Response response = (Response) message;
System.out.println("获取到message");
ResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());
if (responseFuture == null) {
LoggerUtil.warn("NettyClient has response from server, but responseFuture not exist, requestId={}", response.getRequestId());
return null;
}
if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}
return null;
}
}
可以看到如果服务端返回了,就会获取到响应的DefaultResponse
然后将返回结果传入到之前构建的responseFuture
if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}
onSuccess中会调用之前的listener,执行listen回调方法关于
DefaultResponseFuture
通过对象锁protected Object lock = new Object();来保证等待结果的返回
获取返回结果会阻塞,如果服务端还没有返回
Object result = response.getValue();//获取返回结果会阻塞
@Override
public Object getValue() {
System.out.println("getValue");
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable();//如果不是在运行就报错
}
System.out.println("timeout:"+timeout);
if (timeout <= 0) {//超时时间
try {
lock.wait();
} catch (Exception e) {
cancel(new MotanServiceException(this.getClass().getName() + " getValue InterruptedException : "
+ MotanFrameworkUtil.toString(request) + " cost=" + (System.currentTimeMillis() - createTime), e));
}
System.out.println("<= 0 getValueOrThrowable");
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime > 0) {
for (; ; ) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
}
if (!isDoing()) {
break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0) {
break;
}
}
}
}
//超过设置的超时时间不返回就报错
if (isDoing()) {
timeoutSoCancel();
}
}
System.out.println("getValueOrThrowable");
return getValueOrThrowable();
}
}
如果服务端返回会唤醒getValue的阻塞。
public void onSuccess(Response response) {
this.result = response.getValue();
this.processTime = response.getProcessTime();
System.out.println("onSuccess:"+result);
done();
}
如果有阻塞,则会唤醒。點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦