本文转载自微信公众号《程序新视界》,作者为二哥。转载本文请联系程序新视界公众号。说起Nacos的服务订阅机制,不了解的朋友可能会觉得很神秘。本文将带您深入了解Nacos2.0客户端的订阅实现。由于涉及的内容较多,我将分几篇进行,本篇为第一篇。Nacos订阅概述Nacos的订阅机制可以用一句话来描述:Nacos客户端每6秒通过定时任务从注册中心获取实例列表,当发现实例发生变化时,发布一个变化事件,并订户进行业务处理。updateinstanceupdateinstanceupdatelocalcache更新本地缓存的更新实例。上图nacos上展示了订阅方式的主要流程,涉及的内容很多,处理细节也比较复杂。这里我们只需要抓住最核心的部分。下面通过代码和流程图一步步分析上面的过程。从订阅到定时任务,我们这里说的订阅机制本质上是一种准实时感知的服务发现。上面我们已经看到,当订阅方法执行时,会触发一个定时任务,定时从服务器拉取数据。所以本质上,订阅机制是一种实现服务发现的方式,比较方式是直接查询接口。NacosNamingService暴露了很多重载的订阅。重载的目的就是让大家少写参数。Nacos默认处理这些参数。最后,这些重载的方法会调用下面的方法:,",");changeNotifier.registerListener(groupName,serviceName,clusterString,listener);clientProxy.subscribe(serviceName,groupName,clusterString);}方法中的事件监听我们暂时不说,直接看subscribe方法,其中clientProxy类型是NamingClientProxyDelegate。在实例化NacosNamingService时实例化该类。上一章已经提到,这里不再赘述。clientProxy.subscribe方法是在NamingClientProxyDelegate中实现的:.get(serviceKey);if(null==result){//如果为null,则进行订阅逻辑处理,基于gRPC协议result=grpcClientProxy.subscribe(serviceName,groupName,clusters);}//定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName,groupName,clusters);//ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);returnresult;}这个方法是不是很眼熟?我们已经谈过了。看似殊途同归,查询服务列表和订阅最终都调用了同一个方法。上一篇文章讲了其他的流程。这里重点关注任务调度://ServiceInfoUpdateServicepublicvoidscheduleUpdateIfAbsent(StringserviceName,StringgroupName,Stringclusters){StringserviceKey=ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName,groupName),clusters);if(futureMap.get(serviceKey)!=null){return;}synchronized(futureMap){if(futureMap.get(serviceKey)!=null){return;}//构建UpdateTaskScheduledFuturefuture=addTask(newUpdateTask(serviceName,groupName,clusters));futureMap.put(serviceKey,future);}}该方法包括构造serviceKey,通过serviceKey判断权重,最后加入UpdateTask。addTask的实现是发起一个定时任务://ServiceInfoUpdateServiceprivatesynchronizedScheduledFutureaddTask(UpdateTasktask){returnexecutor.schedule(task,DEFAULT_DELAY,TimeUnit.MILLISECONDS);}定时任务延时1秒执行。跟踪到此结束。核心功能只有两个:调用订阅方法和发起定时任务。定时任务做了什么?UpdateTask封装了订阅机制的核心业务逻辑。我们先通过一张流程图来看一下到底做了什么。有了上面的流程图,nacos就基本清楚UpdateTask是干什么的了。直接贴run方法的所有代码:publicvoidrun(){longdelayTime=DEFAULT_DELAY;try{//判断注册的Service是否订阅,如果没有订阅,不再执行if(!changeNotifier.isSubscribed(groupName,serviceName,clusters)&&!futureMap.containsKey(serviceKey)){NAMING_LOGGER.info("updatetaskisstopped,service:"+groupedServiceName+",clusters:"+clusters);return;}//获取缓存的服务信息ServiceInfoserviceObj=serviceInfoHolder.getServiceInfoMap().get(serviceKey);if(serviceObj==null){//根据serviceName,从注册服务器获取Service信息serviceObj=namingClientProxy.queryInstancesOfService(serviceName,groupName,clusters,0,false);serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime=serviceObj.getLastRefTime();return;}//过期服务(服务最新更新时间小于等于缓存刷新时间),重新从注册中心查询if(serviceObj.getLastRefTime()<=lastRefTime){serviceObj=namingClientProxy.queryInstancesOfService(serviceName,groupName,clusters,0,false);//处理服务消息serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime=serviceObj.getLastRefTime();if(CollectionUtils.isEmpty(serviceObj.getHosts())){incFailCount();return;}//下次更新缓存时间设置,默认6秒//TODOmultipletime可配置.delayTime=serviceObj.getCacheMillis()*DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;//重置失败的次数为0refresh,下次执行的时间与failCount相关//failCount=0,下次调度时间为6秒,最多1分钟//即在没有异常执行者的情况下,缓存实例的刷新时间为6秒。schedule(this,Math.min(delayTime<
