nacos入門系列之注冊中心
最近才接触到nacos,于是决定花点时间学习一下。 源码下载:https://github.com/alibaba/nacos 目录结构如下:
模块划分:
console:控制台相关代码
config:动态配置发现,配置中心的代码
naming:动态服务发现,注册中心的代码
core:核心服务被多个模块依赖
common:工具类
client:Nacos客户端代码
api
服务注册发现:
nacos支持动态配置服务,服务发现管理和动态DNS服务,这里先从服务注册发现来对其进行学习。
在学习之前先来看看怎么本地启动nacos:
配置-Dnacos.standalone=true,然后启动console模块的main函数
com.alibaba.nacos.Nacos
启动完成可以访问 http://localhost:8848/nacos/index.html来进行登录管理页面。
我们可以看到服务列表是空的,那么如何让其展示我们注册的服务呢?
让我们从最简单的代码来注册一个我们自己的服务吧。
public static void main(String[] args) throws NacosException, InterruptedException {
String serviceName="helloNacos";
//nacos的地址
NamingService namingService= NacosFactory.createNamingService("localhost:8848");
//发布服务到nacos 假设服务端口是8091
namingService.registerInstance(serviceName,"localhost",8091);
Thread.sleep(Integer.MAX_VALUE);
}运行上面的代码后,回到页面会发现
我们的服务已经注册到了nacos。
了解了服务如何注册,我们就需要考虑如何对服务进行监听了。
public static void main(String[] args) throws NacosException, InterruptedException {
String serviceName="helloNacos";
NamingService namingService= NacosFactory.createNamingService("localhost:8848");
namingService.subscribe(serviceName,event -> {
if(event instanceof NamingEvent){
System.out.println("订阅到了数据变化");
System.out.println((((NamingEvent) event).getInstances()));
}
});
System.out.println("订阅完成,等待服务信息变化");
Thread.sleep(Integer.MAX_VALUE);
}当我们运行上面的代码会打印如下:
订阅完成,等待服务信息变化
订阅到了数据变化
[{
"clusterName": "DEFAULT",
"enabled": true,
"ephemeral": true,
"healthy": true,
"instanceHeartBeatInterval": 5000,
"instanceHeartBeatTimeOut": 15000,
"instanceId": "localhost#8091#DEFAULT#DEFAULT_GROUP@@helloNacos",
"instanceIdGenerator": "simple",
"ip": "localhost",
"ipDeleteTimeout": 30000,
"metadata": {},
"port": 8091,
"serviceName": "DEFAULT_GROUP@@helloNacos",
"weight": 1.0
}]可以看到我们已经成功对服务进行了订阅,在json中包含了服务的ip和端口等信息。
这样我们就完成了最简单的注册中心功能的使用。
实现的原理:
接下来我们来看看nacos是怎么实现注册中心功能的,和常见的zookeeper又有什么不同呢?
在上面的代码中都用到了
NamingService namingService= NacosFactory.createNamingService("localhost:8848");就从这里入手吧,先来看看服务是怎么注册的
NamingFactory是一个统一的入口,初始化了NacosNamingService:
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");然后找到上面简单的代码点进去
//发布服务到nacos 假设服务端口是8091 namingService.registerInstance(serviceName,"localhost",8091);
发现最终会通过http的post请求来进行注册:
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
具体的地址为/nacos/v1/ns/instance,代码在如下类中
com.alibaba.nacos.naming.controllers.InstanceController;
InstanceController会处理客户端发送的注册请求:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}到这里我们知道了客户端是通过http请求的post方式来进行注册的,在nacos服务端是如何注册的呢,又是如何
通知服务的订阅者们感知服务的变化?
我们先简单了解一下CAP,默认都知道CAP是什么。
通常ZK作为注册中心是CP的原则,ZK通过自己的ZAP协议来保证数据的最终一致性,但是可能会造成短暂的不可用。
Nacos既可以实现CP也可以实现AP。
Nacos中CP的实现是基于简化的Raft,类似于ZK的ZAP协议。这里我们不深入讨论。
Nacos也实现了AP,基于AP的时候nacos服务端没有主从的区别,当某个节点挂掉后,不会有类似ZAP的选主过程
客户端的请求会自动切换到新的Nacos节点。当集群中某个Nacos服务器更新了本机的服务实例后,会通知集群中
其他服务器更新自身的服务实例信息。在这个过程中,向不同的服务器发起请求可能会得到不一样的服务实例列表。
OK,简单理解之后我们就来看看基于AP的代码是怎么实现的把。
深入下面的代码
serviceManager.registerInstance(namespaceId, serviceName, instance);
consistencyService.put(key, instances);
在AP原则下
ConsistencyService的实现类是DistroConsistencyServiceImpl
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}在onPut中首先会将注册信息保存到map中,然后会会判断是否是第一次注册,否则说明是CHANGE则会继续调用
后续的代码
dataStore.put(key, datum);
notifier.addTask(key, ApplyAction.CHANGE);//addTask将信息加入队列tasks
Notifier是一个线程,会不停的从队列中获取消息tasks.take();
然后进行handle:
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}这里的listener其实就是
com.alibaba.nacos.naming.core.Service 进入Service的onChange方法: //这里通知发送请求到客户端 //假如当前服务被两个地址发布,则getInstanceList会包含两个地址的信息 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
最终获取到服务所有的地址列表后会通过PushService类进行发送
getPushService().serviceChanged(this);
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}最后会通过UDP请求通知服务订阅者,这里就不详细深入了。
我们接着看是nacos是如何同步数据给其他节点的呢?
taskDispatcher.addTask(key);
TaskScheduler也是一个线程,会不停的从队列中获取信息
当有服务发生变化,会将服务信息加入队列比如:
key:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@helloNacos
public void addTask(String key) {
queue.offer(key);
}在线程具体的run方法中主要是先获取集群所有的nacos节点,然后遍历对每个节点发送http请求。
通过key可以获取到服务所有注册的地址列表信息。具体代码如下:
//遍历所有nacos服务端,发送http请求同步数据
for (Member member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getAddress())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getAddress());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}// 2. get the datums by keys and check the datum is empty or not
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());到这里整个服务注册就结束了。
最后让我们来看看客户端是如何订阅感知服务的变化的吧。
首先找到接收PushService发送请求的类吧
PushReceiver:这里会接收到UDP的请求,获取到改变的信息然后调用 hostReactor.processServiceJSON(pushPacket.data); 最终会调用eventDispatcher.serviceChanged(serviceInfo);
这里需要回到最简单的代码来看:
NacosNamingService初始化的时候会进行init,初始化一个EventDispatcher();
EventDispatcher的作用主要是处理服务端nacos推送来的数据,这里代表服务列表发生变化。
它会单独启动一个Notifier线程,从队列中接收数据:
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
//处理服务变化
public void serviceChanged(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return;
}
changedServices.add(serviceInfo);
}到这里我们之间看Notifier的run方法
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);//从队列获取内容
//进行事件回调
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
}ok这个listeners里面的内容从哪里来?当然是我们自己加入的哦
namingService.subscribe(serviceName,event -> {
if(event instanceof NamingEvent){
System.out.println("订阅到了数据变化");
System.out.println((((NamingEvent) event).getInstances()));
}
});到这里整个注册中心的功能已经梳理的差不多了,还需要多学习。
后面会继续来看看Nacos是如何实现配置中心的。加油。
共同學習,寫下你的評論
評論加載中...
作者其他優質文章



