Nacos Client客户端详解

2022/4/22 nacos

nacos客户端sdk详解

# 一 NacosNamingService解析

# 1.1导入maven依赖

<dependency>
  <groupId>com.alibaba.nacos</groupId>
  <artifactId>nacos-client</artifactId>
  <version>1.3.3</version>
  <scope>compile</scope>
</dependency>

# 1.2创建客户端对象服务

NamingFactory.createNamingService(Properties properties)

解析:通过反射创建实例对象com.alibaba.nacos.client.naming.NacosNamingService

# 1.3参数解析

  1. 检查properties.getProperty中contextPath参数,不能包含"\/"
  2. 初始化namespace
    • 从System.getProperties中获取nacos.use.cloud.namespace.parsing变量值(默认为true
    • 从properties.getProperty中获取isUseCloudNamespaceParsing变量值(默认值从步骤1中获取
    • 步骤2中获取的变量表示:是否使用云命名空间解析
    • 是则依次从System.getProperties中获取ans.namespace、System.getenv中获取ALIBABA_ALIWARE_NAMESPACE 变量值为命名空间值,获取到直接返回
    • 如果命名空间为空,则从System.getProperties中获取namespace变量值
    • 如果命名空间为空,则从properties.getProperty中获取namespace变量值
    • 如果命名空间为空,默认值为public
  3. 初始化服务地址
    • serverList: 从properties.getProperty中获取serverAddr变量值
    • 初始化endpoint
      • 从System.getProperties中获取nacos.use.endpoint.parsing.rule变量值(默认为true
      • 从properties.getProperty中获取isUseEndpointParsingRule变量值(默认值从步骤1中获取
      • 步骤2中获取的变量表示:是否使用Endpoint解析规则
      • 是则从properties.getProperty中获取endpoint变量值(ParamUtil.parsingEndpointRule(...)解析获取真实值)
    • 如果endpoint值不为空,则设置serverList值为空
  4. 初始化WebRootContext**(此属性最不好要修改,保持默认值)**
    • 从System.getProperties中获取nacos.naming.web.context变量值,默认值为/nacos
    • 步骤1变量值不为空,则设置UtilAndComs的值
      • webContext = [/]webContext
      • nacosUrlBase = [/]webContext + "/v1/ns"
      • nacosUrlInstance = [/]webContext + "/v1/ns/instance"
  5. 初始化缓存目录
    • 依次从System.getProperties中获取com.alibaba.nacos.naming.cache.dir变量值,默认值为System.getProperties("user.home") + "/nacos/naming/" + namespace
  6. 初始化日志名称
    • 从System.getProperties中获取com.alibaba.nacos.naming.log.filename变量值
    • 如果为空,则从properties.getProperty中获取com.alibaba.nacos.naming.log.filename变量值
    • 如果为空,默认值为naming.log

# 1.4创建EventDispatcher

private final BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
private final ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>();
private volatile boolean closed = false;
  1. 创建单线程执行器类型的线程池excutor,现成名称为com.alibaba.nacos.naming.client.listener,1线程

  2. 执行内部类Notifier,Notifier内部无限循环执行

    • 从阻塞队列changedServices获取ServiceInfo直到超时(5分钟)

    • ServiceInfo为空继续循环

    • ServiceInfo不为空,从observerMap获取事件监听列表List<EventListener> listeners

    • 事件监听列表listeners不为空,依次执行NamingEvent事件

      Notifier.png

  3. EventDispatcher提供了添加监听器方法,将EventListener添加到observerMap中并将ServiceInfo添加到changedServices阻塞队列中触发事件监听

    public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener {
    	...
    }
    
  4. EventDispatcher提供了移除监听器方法,将EventListener从observerMap中移除

    public void removeListener(String serviceName, String clusters, EventListener listener) {
    	...
    }
    
  5. 作用:订阅服务,取消订阅

# 1.5创建NamingProxy

  1. 创建SecurityProxy

    • 从properties.getPropery中获取username变量值,默认值为""
    • 从properties.getPropery中获取password变量值,默认值为""
    • 从properties.getPropery中获取contextPath变量值,默认值为/nacos
    • 作用:Nacos Server服务登录,刷新accessToken(登录uri:/nacos/v1/auth/users/login)
  2. 从System.getProperties中获取nacos.naming.exposed.port变量值,默认值为8848

  3. 创建调度线程池,名称为:com.alibaba.nacos.client.naming.updater,2线程

    • 每30s,执行refreshSrvIfNeed(),从endpoint中获取Nacos Server列表,并赋值serversFromEndpoint

    • 每5s,执行securityProxy.login(getServerList()),登录Nacos server,刷新accessToken,保持登录状态

      private List<String> getServerList() {
        List<String> snapshot = serversFromEndpoint;
        if (!CollectionUtils.isEmpty(serverList)) {
          snapshot = serverList;
        }
        return snapshot;
      }
      
  4. NamingProxy提供基于http的操作(Instance、Service、BeatInfo),屏蔽掉调用细节

    • 注册服务、取消注册服务、查询服务、创建服务、删除服务、更新服务
    • 更新实例、查询实例列表
    • 发送心跳、服务是否健康

# 1.6创建BeatReactor

public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
  1. 从properties.getPropery中获取namingClientBeatThreadCount变量值,默认值为(可用处理器/2 | 1)
  2. 创建调度线程池,名称为:com.alibaba.nacos.naming.beat.sender
  3. 延时默认值:BeatInfo.getPeriod() —> Instance.getInstanceHeartBeatInterval()
    • 通过NamingProxy发送心跳,如果响应码为20404,则通过NamingProxy重新注册服务
    • 确定下一次延时时间,从响应结果中获取clientBeatInterval变量值,没有则取默认值
  4. BeatReactor提供添加心跳信息、移除心跳信息(Instance、BeatInfo)

# 1.7创建HostReactor

private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
private final Map<String, ServiceInfo> serviceInfoMap;
private final Map<String, Object> updatingMap;
  1. 从properties.getPropery中获取namingLoadCacheAtStart变量值,默认值为false

  2. 从properties.getPropery中获取namingPollingThreadCount变量值,默认值为(可用处理器/2 | 1)

  3. 如果步骤1值为true,DiskCache.read(this.cacheDir)加载磁盘缓存服务信息到serviceInfoMap中

    if (loadCacheAtStart) {
    	this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
    } else {
    	this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
    }
    
  4. 创建调度线程池,名称为:com.alibaba.nacos.client.naming.updater,线程数为步骤2获取值

  5. 如果步骤1变量值为true,将加载本地缓存服务的文件,DiskCache.read(this.cacheDir)

  6. 创建FailoverReactor

    • 创建调度线程池,名称为:com.alibaba.nacos.naming.failover,1线程

    • 创建SwitchRefresher

      private Map<String, ServiceInfo> serviceMap = new ConcurrentHashMap<String, ServiceInfo>();
      private final Map<String, String> switchParams = new ConcurrentHashMap<String, String>();
      
      • 每5s执行一次
      • 查询文件:cacheDir + "/failover00-00---000-VIPSRV_FAILOVER_SWITCH-000---00-00"
      • 若值为1,则switchParams.put("failover-mode", "true")
        • 创建FailoverFileReader
        • 执行方法run()
        • 读取文件夹:cacheDir + "/failover"
        • 赋值给serviceMap
      • 否则switchParams.put("failover-mode", "false")
    • 创建DiskFileWriter

      • 延时30m每1d、延时10s执行
      • 从HostReactor的serviceInfoMap缓存中的服务信息
      • 依次刷入磁盘:DiskCache.write(serviceInfo, failoverDir)
    • serviceMap:存放故障转移文件(cacheDir + "/failover")中加载的服务信息

    • switchParams:存放是否启用故障转移

  7. 创建PushReceiver

    • 创建udpSocket服务,用户接收Nacos Server端推送的服务变更信息
    • 创建调度线程池,名称为:com.alibaba.nacos.naming.push.receiver,1线程
    • 轮训接收udpSocket中推送的消息,转发给HostReactor.processServiceJson(String)
    • 作用:被动刷新内存中服务信息
  8. HostReactor:processServiceJson(String)

    • 替换serviceInfoMap中对应的服务缓存信息
    • 当有实例更新时,尝试更新心跳信息
    • EventDispatcher发布服务变更事件(通知服务订阅者)
    • 将ServiceInfo刷入磁盘
  9. HostReactor:getServiceInfo(String serviceName, String clusters)

    • 判断是否启用故障转移,启用则从FailoverReactor的serviceMap中获取ServiceInfo,直接返回
    • 否则在serviceInfoMap中获取ServiceInfo
    • ServiceInfo不为空,判断是否正在从服务端获取
      • 是则等待获取同步锁后,等待5s钟
      • 否则执行NamingProxy.queryList(...)获取
    • 判断是否添加定时任务UpdateTask(定时更新服务信息)
      • 初始值延时1s执行
      • 刷新对应的服务信息
      • 如果服务信息有误,则failCount自增1;无误则重置failCount
      • 延时时间 delayTime << failCount,最大值60s(1s,2s,4s,8s,16s,32s,60s)

# 1.8NacosNamingService

  1. 参数说明

    • Instance:实例信息
      • instanceId:实例id
      • ip:ip地址
      • port:端口
      • serviceName:服务名称
      • metadata:元信息
      • weight:权重,默认值1.0
      • healthy:是否健康,默认值true
      • enabled:是否启用,默认值true
      • ephemeral:是否是临时实例,默认值true
      • clusterName:集群名称,默认值DEFAULT
    • groupName:分组名称,默认值DEFAULT_GROUP
  2. registerInstance:注册实例

    • 判断是否是临时实例,是则构建心跳信息,并添加心跳调度任务BeatTask

      if (instance.isEphemeral()) {
          BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
          beatReactor.addBeatInfo(groupedServiceName, beatInfo);
      }
      
    • 调用http注册服务

      serverProxy.registerService(groupedServiceName, groupName, instance);
      
  3. deregisterInstance:注销实例

    • 判断是否是临时实例,是则停止心跳调度任务BeatTask

      if (instance.isEphemeral()) {
          beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
      }
      
    • 调用http注销服务

      serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
      
  4. getAllInstances:获取服务所有实例

    • subscribe:是否订阅,默认值true
    • 如果不是订阅模式,则直接从服务器获取服务实例列表
    • 否则优先从缓存中获取,没有再从服务器获取,并添加定时拉去调度任务
  5. selectInstances:根据健康状态选择实例

  6. selectOneHealthyInstance:根据权重随机选择一个实例

  7. subscribe:订阅服务

  8. unsubscribe:取消订阅

# 二 NacosConfigService解析

# 2.1导入maven依赖

<dependency>
  <groupId>com.alibaba.nacos</groupId>
  <artifactId>nacos-client</artifactId>
  <version>1.3.3</version>
  <scope>compile</scope>
</dependency>

# 2.2创建客户端对象服务

NamingFactory.createNamingService(properties)

解析:通过反射创建实例对象com.alibaba.nacos.client.config.NacosConfigService

# 2.3参数解析

  1. 检查properties.getProperty中contextPath参数,不能包含"/"
  2. 从properties.getProperty中获取**encode**变量值,默认值UTF-8
  3. 初始化namespace
    • 详解见
    • 注意:从云命名空间解析后没有值,仅会从properties.getProperty获取变量值

# 2.4创建ServerHttpAgent

  1. 创建ServerListManager

    volatile List<String> serverUrls = new ArrayList<String>();
    private volatile String currentServerAddr;
    
    • 从properties.getProperty中获取serverAddr变量值
    • 从properties.getProperty中获取namespace变量值
    • 初始化endpoint,详解见,从System.getenv中获取ALIBABA_ALIWARE_ENDPOINT_PORT、properties.getProperty中获取endpointPort变量值
    • 从properties.getProperty中获取contextPath、System.getProperty中获取nacos.client.contextPath变量值,默认值nacos
    • 从properties.getProperty中获取clusterName变量值,默认值serverlist
    • 如果serverAddrsStr不为空
      • 将isFixed置为true,表示Nacos Server列表是固定的,不需要动态获取
      • 给服务列表补齐协议和端口,端口:从System.getProperty中获取nacos.server.port,默认值8848;协议:http://
      • 赋值给serverUrls
      • 创建name:"fixed" + getFixedNameSuffix(...) [+ namespace]
    • 如果serverAddrsStr为空
      • 将isFixed置为false
      • 创建name:endpoint [+ "-" + namespace]
      • 创建addressServerUrl,用于获取Nacos Server服务列表
    • start():获取Nacos Server服务列表
      • 创建GetServerListTask任务,并执行一次,如果失败最多尝试5次后抛出异常
      • 每30s,根据addressServerUrl获取Nacos Server服务列表,并设置到serverUrls
    • 作用:持有Nacos Server服务列表,并提供一条可用的Nacos Server链接
  2. 创建SecurityProxy,详解见

  3. 从properties.getProperty中获取maxRetry变量值,默认值3

  4. 开启调度刷新accessToken任务

  5. 作用:提供httpGet、httpPost、httpDelete方法,屏蔽掉具体调用细节

# 2.5创建MetricsHttpAgent

  1. 代理ServerHttpAgent,增加调用时间监控
  2. 启动HttpAgent,即启动ServerListManager,作用是当配置了endpoint时,定时刷新serverUrls
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();

# 2.6创建ClientWorker

private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
  1. properties.getProperty中获取configLongPollTimeout变量值,默认值30000,最小值10000

  2. 从properties.getProperty中获取configRetryTime变量值,默认值2000

  3. 从properties.getProperty中获取enableRemoteSyncConfig变量值,默认值false

  4. 创建调度线程池,名称为:"com.alibaba.nacos.client.Worker." + agent.getName(),1线程

    public void checkConfigInfo() {
      // Dispatch taskes.
      int listenerSize = cacheMap.get().size();
      // Round up the longingTaskCount.
      int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
      if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
          // The task list is no order.So it maybe has issues when changing.
          executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
      }
    }
    
    • 延时1ms每10ms执行
    • 将cacheMap中的CacheData分组,有多少个组创建多少个步骤5的线程执行任务LongPollingRunnable
    • 分组算法为cacheMap总数量除每个任务配置大小再向上取整
    • 每个任务配置大小:从System.getProperty中获取PER_TASK_CONFIG_SIZE变量值,默认值3000
  5. 创建调度线程池,名称为:"com.alibaba.nacos.client.Worker.longPolling." + agent.getName(),线程为:可用处理器数量

    • 仅会执行自己分组内的CacheData配置更新检查和刷盘操作,通过taskId关联
    • 获取故障转移文件,System.getProperty中依次取JM.SNAPSHOT.PATH、user.home属性值 + "/nacos/config/" + serverName + "_nacos/data/config-data[-tenant/" + tenant + "]/" + group + "/" + dataId
    • 判断是否启动本地配置信息(CacheData.isUseLocalConfigInfo)
      • 不启用并且故障转移文件存在,则加载故障转移文件中配置,并设置CacheData为启用本地配置信息,重新计算MD5
      • 启用并且文件不存在,设置CacheData为不启用本地配置信息
      • 启用并且文件存在并且故障转移文件有修改,则加载故障转移转移文件中配置,重新计算MD5
      • 启用,检查监听器MD5,如果监听器中的MD5与CacheData中的MD5是否相同
      • 不同则通知监听器,即监听回调,listener.receiveConfigInfo(contentTmp)
      • 如果监听器是AbstractConfigChangeListener,触发改变数据事件的回调,listener.receiveConfigChange(event),配置为最新值
      • 更新监听器包装器中的MD5为最新值
    • 不启用本地配置,将从Nacos Server通过长链接获取
      • 长链接超时时间,配置见
      • 读超时时间,长链接超时 * 3 / 2
      • 如果返回配置数据为空,不处理
      • 如果返回配置数据有值,表示有哪些配置有被修改
      • 重新发起获取配置信息,返回成功或者找不到,更新本地快照文件
      • 将新配置信息设置到CacheData中,检查监听MD5,触发通知监听器
  6. addListeners:添加监听器,如果CacheData不存在,先创建在添加Listener

  7. removeListener:移除监听器,如果CacheData存在才移除监听器

# 2.7NacosConfigService

获取配置数据

  1. getConfig:获取配置

    • 优先获取故障转移文件配置,有该文件,则直接返回
    • 没有故障转移文件,则拉取Nacos Server配置,直接返回
    • 如果从Nacos Server拉取报错,则从本地快照文件中获取配置信息
  2. getConfigAndSignListener:获取配置并添加监听器

    添加监听器及回调

  3. addListener:添加监听器

  4. publishConfig:发布配置信息,将配置发布到Nacos Server

  5. removeConfig:移除配置信息,将Nacos Server中的配置删除

  6. removeListener:删除监听器

# 三 参数配置说明

# 3.1NacosConfigService

  • Properties
    • contextPath:nacos server服务的上下文路径
    • clusterName
    • encode:用户配置信息data编解码
    • isUseCloudNamespaceParsing:是否启用云命名空间解析
    • namespace:命名空间
    • serverAddr:nacos server地址
    • endpointPort:endpoint端口
    • endpoint:endpoint用于动态获取nacos server地址
    • isUseEndpointParsingRule:是否启用endpoint解析规则,${xx:defaultEndpointUrl}
    • username:用户名
    • password:密码
    • ramRoleName:Sts临时凭证时使用
    • accessKey:aliyun MSE使用
    • secretKey:aliyun MSE使用
    • maxRetry:http最大重试次数
    • configLongPollTimeout:拉取变更配置长连接超时时间
    • configRetryTime:当长连接处理配置变更出错时,延时时长
    • enableRemoteSyncConfig:当创建CacheDate时,是否拉取配置信息
  • System.getProperties
    • nacos.use.cloud.namespace.parsing:是否启用云命名空间解析
    • acm.namespace:云命名空间
    • tenant.id:租户id
    • nacos.use.endpoint.parsing.rule:是否启用endpoint解析规则,${xx:defaultEndpointUrl}
    • xx:endpoint解析规则中的endpoint变量名称,获取endpoint
    • nacos.server.port:nacos server服务的端口
    • nacos.client.contextPath:nacos server服务的上下文路径
    • nacos.client.appKey:客户端身份信息
    • project.name:工程名称
    • jboss.server.home.dir:jboss home文件夹,用于判断server类型
    • jetty.home:jetty home文件夹,用于判断server类型
    • catalina.base:tomcat文件夹,用于判断server类型
    • NACOS.CONNECT.TIMEOUT:nacos server链接超时
    • PER_TASK_CONFIG_SIZE:将配置分组的大小
    • ram.role.name:STS临时凭证时使用
    • time.to.refresh.in.millisecond:STS临时凭证时使用
    • security.credentials:STS临时凭证时使用
    • security.credentials.url:STS临时凭证时使用
    • cache.security.credentials:STS临时凭证时使用
    • spas.identity:aliyun MSE使用
    • JM.SNAPSHOT.PATH:配置信息快照base路径,用于存放故障转移文件和配置快照信息
  • System.getenv
    • ALIBABA_ALIWARE_NAMESPACE:云命名空间
    • ALIBABA_ALIWARE_ENDPOINT_PORT:云endpoint端口
    • ALIBABA_ALIWARE_ENDPOINT_URL:云endpoint
    • xx:endpoint解析规则中的endpoint变量名称,获取endpoint
    • spas_accessKey:aliyun MSE使用
    • spas_secretKey:aliyun MSE使用

# 3.2NacosNamingService