1. Zuul 介绍
Zuul 在 Spring Cloud 微服务体系中担任很重要的角色--服务网关,是基于JVM的路由器和负载均衡器。
Zuul 的基本使用以及 Filter 的介绍就不在这说明了,本文主要介绍 Zuul 的原理。
2. Zuul 处理流程
处理流程如下:
Request => ZuulHandlerMapping => ZuulController => ZuulServlet
主要的接收逻辑都在 ZuulServlet 中,执行 Filter 的逻辑,根据 Filter 的类型依次执行,如下代码:
try {
preRoute();
} catch (ZuulException e) {
error(e);
postRoute(); return;
}try {
route();
} catch (ZuulException e) {
error(e);
postRoute(); return;
}try {
postRoute();
} catch (ZuulException e) {
error(e); return;
}接收的代码已经清楚了,其实 Zuul 组件的功能就到这边了,剩下对请求进行路由其实主要使用了Ribbon 组件进行的,因此下面与其说是介绍 Zuul 到不如说是 Ribbon 的介绍。
路由的逻辑处理主要是 route() 即 Route Filter 进行的。
3. Route Filter
Zuul 中 Route Filter 有 SimpleHostRoutingFilter 和 RibbonRoutingFilter, 有人说还有 SendForwardFilter(本地的先不关注)。
3.1 SimpleHostRoutingFilter
当你配置路由时,直接配置 Url 而不是 serviceId ,那么就是使用的 SimpleHostRoutingFilter,相反就是用的 RibbonRoutingFilter 。
主要逻辑:
public Object run() { // 省略没用逻辑 ...
String uri = this.helper.buildZuulRequestURI(request); this.helper.addIgnoredHeaders(); try { // forward 主要逻辑
CloseableHttpResponse response = forward(this.httpClient, verb, uri, request, headers, params, requestEntity);
setResponse(response);
} catch (Exception ex) { throw new ZuulRuntimeException(ex);
}
}// 从返回就能看出来调用 httpClient 完成的http请求private CloseableHttpResponse forward () {// ...
// forwardRequest
CloseableHttpResponse zuulResponse = forwardRequest(httpclient, httpHost, httpRequest); return zuulResponse;// ...}// 通过 httpClient 发请求private CloseableHttpResponse forwardRequest(CloseableHttpClient httpclient,
HttpHost httpHost, HttpRequest httpRequest) throws IOException { return httpclient.execute(httpHost, httpRequest);
}总结:构建 Request 然后通过 httpClient 进行请求。
3.2 RibbonRoutingFilter
public Object run() { // ...
// 构建请求上下文,其实就是保护一下参数,如serviceId, retryable, url, 原request等
RibbonCommandContext commandContext = buildCommandContext(context); // forward
ClientHttpResponse response = forward(commandContext);
setResponse(response); return response;
}protected ClientHttpResponse forward(RibbonCommandContext context) { // ...
// 这里的重点转到 command 上了,主要逻辑都是 command 中执行
RibbonCommand command = this.ribbonCommandFactory.create(context);
ClientHttpResponse response = command.execute(); return response;
}4. Command
4.1 继承关系
HystrixCommandRibbonCommand <- AbstractRibbonCommand <- HttpClientRibbonCommand / RestClientRibbonCommand / OkHttpRibbonCommand
主要的逻辑都是 AbstractRibbonCommand 中,子类是不同选型 HttpClient, OkHttp 和 HttpURLConnection。
4.2 AbstractRibbonCommand
由于继承的 HystrixCommand 所以需要实现 run() 方法,上面调用execute() 是来自 HystrixCommand 主体逻辑需要 run() 中实现。
protected ClientHttpResponse run() throws Exception { final RequestContext context = RequestContext.getCurrentContext(); // 根据不同实现创建不同的Request
RQ request = createRequest(); // 执行负载均衡逻辑,其中 client 是 ribbonCommandFactory.create 中置入的
RS response = this.client.executeWithLoadBalancer(request, config);
context.set("ribbonResponse", response); if (this.isResponseTimedOut()) { if (response != null) {
response.close();
}
} return new RibbonHttpResponse(response);
}4.3 RibbonCommandFactory
从 CommandFactory 观察下 client 是什么类,做什么事情。
ribbonCommandFactory 的继承关系:
RibbonCommandFactory <- AbstractRibbonCommandFactory <- HttpClientRibbonCommandFactory OkHttpRibbonCommandFactory RestClientRibbonCommandFactory
以默认的 HttpClientRibbonCommandFactory 为例,代码如下:
public HttpClientRibbonCommand create(final RibbonCommandContext context) { // 获取降级处理
ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId()); // serviceId 是根据请求的url来比对配置的路由得到的
final String serviceId = context.getServiceId(); // clientFactory 根据 ServiceId 获取相关的组件,包括 IRule, IClientConfig, ILoadBalancer 等组件
final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); // 不同类型的 Factory 会获取不同的 client,生成不同的 Command
return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}由此可见 client 就是 RibbonLoadBalancingHttpClient ,当然其他实现也会对应不一样的 client。
4.4 Client
client 的继承比较复杂,从主要的继承看:
LoadBalancerContext <- AbstractLoadBalancerAwareClient <- AbstractLoadBalancingClient <- RibbonLoadBalancingHttpClient OkHttpLoadBalancingClient
4.2 中 this.client.executeWithLoadBalancer 执行的是:
// AbstractLoadBalancerAwareClient.executeWithLoadBalancer()public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { // 重试策略处理类,要判断是不是重试可以重写这个
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig); // 专门用于失败切换其他服务端进行重试的 Command
LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri())
.build(); try { // 见下面分析
return command.submit( new ServerOperation<T>() { @Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri); try { // 真实动作,执行 execute
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) { return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t;
} else { throw new ClientException(e);
}
}
}这里有两个地方需要注重看的:
LoadBalancerCommand 实现
execute() 逻辑
先看 execute(), LoadBalancerCommand 单独介绍
4.4.1 execute()
以默认的 RibbonLoadBalancingHttpClient 为例。(RibbonLoadBalancingHttpClient 将会被改名为ApacheHttpLoadBalancingClient,因为它的兄弟叫OkHttpLoadBalancingClient这样看上去比较像比较对称)。
public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,final IClientConfig configOverride) { // ...
final HttpUriRequest httpUriRequest = request.toRequest(requestConfig); // delegate == httpClient, 是通过 createDelegate 创建的,那么 okHttp 的相应的地方就是 okHttp 对应的 client
// 所以,这里就是发送普通的 http 请求
final HttpResponse httpResponse = this.delegate.execute(httpUriRequest); return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}4.5 LoadBalancerCommand
主要逻辑都是在 submit 中, 使用了 rxJava 的特性进行重试, 下面删除了很多细节代码,剩下主干重试逻辑。
public Observable<T> submit(final ServerOperation<T> operation) { // ...
// 外层的 observable 为了不同目标的重试
// selectServer() 是进行负载均衡,返回的是一个 observable,可以重试,重试时再重新挑选一个目标server
Observable<T> o = selectServer().concatMap(server -> { // 这里又开启一个 observable 主要是为了同机重试
Observable<T> o = Observable
.just(server)
.concatMap(server -> { return operation.call(server).doOnEach(new Observer<T>() { @Override
public void onCompleted() { // server 状态的统计,譬如消除联系异常,抵消activeRequest等
}
@Override
public void onError() { // server 状态的统计,错误统计等
}
@Override
public void onNext() { // 获取 entity, 返回内容
}
});
}) // 如果设置了同机重试,进行重试
if (maxRetrysSame > 0)
// retryPolicy 判断是否重试,具体分析看下面
o = o.retry(retryPolicy(maxRetrysSame, true)); return o;
})
// 设置了异机重试,进行重试
if (maxRetrysNext > 0)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(exp -> { return Observable.error(e);
});
}主要的重试逻辑如上,但是细节需要分析的:
retryPolicy()
selectServer()
目标 Server 状态记录
4.5.1 retryPolicy
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) { return new Func2<Integer, Throwable, Boolean>() { @Override
public Boolean call(Integer tryCount, Throwable e) { if (e instanceof AbortExecutionException) { return false;
} // 重构总次数还是会放弃重试的
if (tryCount > maxRetrys) { return false;
}
if (e.getCause() != null && e instanceof RuntimeException) {
e = e.getCause();
}
// 判断 exception 是否进行重试,可以自定义 handler 进行定制化
return retryHandler.isRetriableException(e, same);
}
};
}4.5.2 selectServer
private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override
public void call(Subscriber<? super Server> next) { try { // 通过 loadBalancerContext.getServerFromLoadBalancer 来进行负载均衡
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server); next.onCompleted();
} catch (Exception e) { next.onError(e);
}
}
});
}loadBalancerContext.getServerFromLoadBalancer 进行负载均衡选择下一个请求目标,整个方法比较大,不列出了,把调用关系列出后分析主要的逻辑类。
loadBalancerContext.getServerFromLoadBalancer () > lb.chooseServer()
实际在作用的是 ILoadBalancer.chooseServer 方法。
4.5.3 ILoadBalancer
ILoadBalancer 继承关系:
ILoadBalancer <- AbstractLoadBalancer <- BaseLoadBalancer <- DynamicServerListLoadBalancer <- ZoneAwareLoadBalancer
ILoadBalancer 接口:
public interface ILoadBalancer { void addServers(List<Server> newServers); Server chooseServer(Object key); // 主要逻辑
void markServerDown(Server server); @Deprecated
List<Server> getServerList(boolean availableOnly); List<Server> getReachableServers(); List<Server> getAllServers();
}实现负载均衡的逻辑的类 BaseLoadBalancer, DynamicServerListLoadBalancer 加入动态 ServerList 的功能,负载均衡逻辑并没有补充。
BaseLoadBalancer.chooseServer 主要逻辑代码:
public Server chooseServer(Object key) { if (counter == null) {
counter = createCounter();
}
counter.increment(); if (rule == null) { return null;
} else { try { // Rule 执行挑选逻辑
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null;
}
}
}4.6 IRule
IRule<- AbstractLoadBalancerRule <- ClientConfigEnabledRoundRobinRule // abstract <- BestAvailableRule // 最小连接优先轮询 PredicateBasedRule // abstract <- AvailabilityFilteringRule <- ZoneAvoidanceRule <- RoundRobinRule <- WeightedResponseTimeRule <- RandomRule <- RetryRule
4.6.1 PredicateBasedRule
基于逻辑断言进行判断是否选择的 Rule, 具体 Predicate 继承如下:
Predicate <- AbstractServerPredicate <- AvailabilityPredicate // 可用性判断 ZoneAvoidancePredicate // 区域选择 CompositePredicate // 复合判断自身没有逻辑,组合其他 Predicate
AvailabilityPredicate
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats(); if (stats == null) { return true;
} // 判断是否跳过
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) { // 如果处于不可用 或者 当前请求大于最大限制 时跳过该目标
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true;
} return false;
}ZoneAvoidancePredicate
public boolean apply(@Nullable PredicateKey input) { // ...
// 选择出可用区域,具体逻辑在 ZoneAvoidanceRule 中解析
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); if (availableZones != null) { return availableZones.contains(input.getServer().getZone());
} else { return false;
}
}CompositePredicate 组合逻辑断言
CompositePredicate// 使用多个 Predicate 组成判断的 And 逻辑链// 类似 if xx && yy & oo Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);// 获取可用列表时使用到回退逻辑public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator(); // 当筛选下来的server个数不符合配置中的最小个数时,会进行回退重选,一直回退到符合要求或者没有回退逻辑
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
} return result;
}// AbstractServerPredicate 上面 super.getEligibleServers public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList(); for (Server server: servers) { // 每个 server 经过逻辑断言进行判断进行筛选
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
} return results;
}
}三大 Predicate 已经介绍完毕,回到主题。
PredicateBasedRule 主要逻辑:
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer(); // 基于逻辑断言进行轮询 Predicate 由子类决定
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get();
} else { return null;
}
}// AbstractServerPredicatepublic Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { // 过滤可用结果, getEligibleServers 上面已经解析
List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent();
} // 标准轮询
return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size()));
}4.6.2 AvailabilityFilteringRule
AvailabilityFilteringRule 目标可用性轮询
public Server choose(Object key) { int count = 0;
Server server = roundRobinRule.choose(key); while (count++ <= 10) { // 逻辑判断
if (predicate.apply(new PredicateKey(server))) { return server;
} // 轮询
server = roundRobinRule.choose(key);
} return super.choose(key);
}// 其中 predicate// CompositePredicate 组合逻辑,这里只有 AvailabilityPredicate 可用性判断predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();4.6.3 ZoneAvoidanceRule
ZoneAvoidanceRule 没有重写 choose 方法,所以还是继承了 PredicateBasedRule,所以过滤逻辑其实就是 compositePredicate. getEligibleServers,而经过上面的解析,getEligibleServers 其实就是所有 server 进行逻辑判断,把通过的返回。
// Predicate 组合了 zonePredicate 和 availabilityPredicatecompositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
可见主要是 zonePredicate 和 availabilityPredicate 的逻辑判断。
zonePredicate 上面分析主要调用 ZoneAvoidanceRule.getAvailableZones
// getAvailableZones 主要逻辑for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) { String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); int instanceCount = zoneSnapshot.getInstanceCount(); // 没有实例 即排除
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else { double loadPerServer = zoneSnapshot.getLoadPerServer(); // 不可用率超过阀值 或者 区域本来就不可用,即排除
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else { // 过滤出 负载最高的几个区域
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) { // they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}// 没有排除 并且 最高负载没有超过限制,返回if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) { // zone override is not needed here
return availableZones;
}// 否则 随机排除一个负载高的区域String zoneToAvoid = randomChooseZone(snapshot, worstZones);if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}return availableZones;这里有个问题:为啥当存在排除时即便没有超过限制负载也要排除一个区域?
4.6.4 RoundRobinRule
比较简单,如下:
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) {
log.warn("no load balancer"); return null;
}
Server server = null; int count = 0; while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb); return null;
} // 累加取模,标准轮询
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
// 非线程安全list,可能会导致size有了对应索引处元素没有同步过来
if (server == null) { /* Transient. */
Thread.yield(); continue;
} // 可用即返回,不然下一轮
if (server.isAlive() && (server.isReadyToServe())) { return (server);
} // Next.
server = null;
} // 超过10次没有获取到可用的server
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
} return server;
}4.6.5 WeightedResponseTimeRule
// 这里会启动一个维持 使用响应时间计算比重系数 的任务 DynamicServerWeightTask// 主要公式// totalResponseTime 为所有server 平均响应时间的和,由下公式知,响应越快 weight 越大// weight = totalResponseTime - ss.getResponseTimeAvg();// weightSoFar += weight;// finalWeights.add(weightSoFar); // 0 - maxTotalWeight 的概率假设是平均的,那么 weight 越大区间就越大被选中的概率就越大// 如 Aw(10) Bw(30) Cw(40) Dw(20)// weightSoFar: 10, 40, 80, 100// 那么 0-10, 10-40, 40-80, 80-100 可以加 40-80区间最大,概率就越大double randomWeight = random.nextDouble() * maxTotalWeight;// pick the server index based on the randomIndexint n = 0;for (Double d : currentWeights) { if (d >= randomWeight) {
serverIndex = n; break;
} else {
n++;
}
}4.7 DynamicServerListLoadBalancer
继承自 BaseLoadBalancer 跟 BaseLoadBalancer 不同的是它持有 ServerList 对象来进行动态的获取 Server 列表。
4.7.1 ServerList
ServerList <- DynamicServerList <- DiscoveryEnabledNIWSServerList
public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); public List<T> getUpdatedListOfServers();
}DynamicServerList: 定时地从一个RouteStore中获取DiscoveryEnabledNIWSServerList: 从Eureka中获取
4.8 Server 状态
怎么轮询怎么选择过滤都已经分析了,但是过滤和选择中使用到 Server Status 是怎么统计的,接下去看。
ServerStats 类记录了 server 的所有状态。
4.8.1 判断是否跳过
下面是判断是否跳过 server 上面已经分析,其中 stats.isCircuitBreakerTripped 是判断的关键
// AvailabilityPredicateshouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true;
} return false;
}public boolean isCircuitBreakerTripped(long currentTime) { // 获取故障的到期时间点
long circuitBreakerTimeout = getCircuitBreakerTimeout(); if (circuitBreakerTimeout <= 0) { return false;
} // 大于当前时间表示还在出于故障
return circuitBreakerTimeout > currentTime;
}private long getCircuitBreakerTimeout() { long blackOutPeriod = getCircuitBreakerBlackoutPeriod(); if (blackOutPeriod <= 0) { return 0;
} // 上次失败的时间点 + 需要断路的时间长度
return lastConnectionFailedTimestamp + blackOutPeriod;
}private long getCircuitBreakerBlackoutPeriod() { int failureCount = successiveConnectionFailureCount.get(); int threshold = connectionFailureThreshold.get(); // 小于阀值(默认3)即不断路
if (failureCount < threshold) { return 0;
} // diff 超过阀值的次数,最大16
int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold); // blackOutSeconds 最大 2^16 * 基数时间
int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get(); // 再次进行限制,断路总时间不超过 maxCircuitTrippedTimeout.get
if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
blackOutSeconds = maxCircuitTrippedTimeout.get();
} return blackOutSeconds * 1000L;
}判断是否在断路不可用状态就这样,下面看一些状态是怎么进去的。
4.8.2 记录状态
LoadBalancerCommand 中有状态的记录
// 这里开始loadBalancerContext.noteOpenConnection(stats);@Overridepublic void onCompleted() { // 记录准确状态
recordStats(tracer, stats, entity, null);
}@Overridepublic void onError(Throwable e) { // 记录错误状态
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}@Overridepublic void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop(); // 这里介绍
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}// loadBalancerContextpublic void noteOpenConnection(ServerStats serverStats) { if (serverStats == null) { return;
} try {
serverStats.incrementActiveRequestsCount();
} catch (Exception ex) {
logger.error("Error noting stats for client {}", clientName, ex);
}
}// serverStats// 各种记录public void incrementActiveRequestsCount() {
activeRequestsCount.incrementAndGet();
requestCountInWindow.increment(); long currentTime = System.currentTimeMillis();
lastActiveRequestsCountChangeTimestamp = currentTime;
lastAccessedTimestamp = currentTime; if (firstConnectionTimestamp == 0) {
firstConnectionTimestamp = currentTime;
}
}// loadBalancerContextpublic void noteRequestCompletion(ServerStats stats, Object response, Throwable e, long responseTime, RetryHandler errorHandler) { if (stats == null) { return;
} try {
recordStats(stats, responseTime);
RetryHandler callErrorHandler = errorHandler == null ? getRetryHandler() : errorHandler; if (callErrorHandler != null && response != null) { // 没有错误时,清除连续错误标识
stats.clearSuccessiveConnectionFailureCount();
} else if (callErrorHandler != null && e != null) { // 判断是否需要断路的exception
if (callErrorHandler.isCircuitTrippingException(e)) { // 有错误时开始连续错误计数
stats.incrementSuccessiveConnectionFailureCount();
// 增加错误数
stats.addToFailureCount();
} else { // 非断路错误时清除连续标识
stats.clearSuccessiveConnectionFailureCount();
}
}
} catch (Exception ex) {
logger.error("Error noting stats for client {}", clientName, ex);
}
}// 退场专用private void recordStats(ServerStats stats, long responseTime) { if (stats == null) { return;
} // 活动请求数减一
stats.decrementActiveRequestsCount(); // 增加请求统计
stats.incrementNumRequests(); // 记录响应时间,有些负载策略需要响应时间
stats.noteResponseTime(responseTime);
}5. 回顾
5.1 调用路径
// 调用路径1.HandleMapping -> 2.ZuulController -> 3.ZuulServlet.service() -> 4.RibbonRoutingFilter ->5.HystrixCommand.execute() -> 6.AbstractRibbonCommand.run() ->7.RibbonLoadBalancingHttpClient.executeWithLoadBalance() ->8.LoadBalancerCommand.submit() -> 9.RibbonLoadBalancingHttpClient.execute() ->10.HttpClient.execute()
1-4 都比较容易,5是为了有熔断效果所以用 Hystrix 进行包装,实际的逻辑都是对应的 Command 完成,7是不同的 Command 持有一个对应的 Client,执行 executeWithLoadBalance() 为了达到负载均衡和重试的效果,这个效果就交给 8.LoadBalancerCommand 完成,但是 LoadBalancerCommand 也只负责重试和负载均衡,具体执行的远程 http 请求还是由 9 来完成,而每个 BalancingClient 都是持有个真实的 client, 如: HttpClient, OKHttp,由这些 client 执行。
5.2 分支逻辑
5.2.1 负载均衡
分析了怎么进行 selectServer 的过程,以及常用的 ILoadBalancer 类型,对应的 IRule 即真实挑选和负载轮询逻辑实现。
5.2.2 状态记录
负载轮询的挑选逻辑中使用到 Server 的状态,所以分析了状态的记录以及怎么判断是否在断路状态的主要逻辑。
5.3 总结
Zuul 的主要代码并不是很大,即请求进来然后进行 Filter 处理,路由到上游服务器都是 Ribbon 的逻辑。
作者:alexqdjay
来源:https://my.oschina.net/alexqdjay/blog/1802503
共同學習,寫下你的評論
評論加載中...
作者其他優質文章