当前位置: 首页 > 科技观察

NacosClient服务订阅机制核心流程

时间:2023-03-13 04:55:08 科技观察

本文转载自微信公众号《程序新视界》,作者为二哥。转载本文请联系程序新视界公众号。说起Nacos的服务订阅机制,不了解的朋友可能会觉得很神秘。本文将带您深入了解Nacos2.0客户端的订阅实现。由于涉及的内容较多,我将分几篇进行,本篇为第一篇。Nacos订阅概述Nacos的订阅机制可以用一句话来描述:Nacos客户端每6秒通过定时任务从注册中心获取实例列表,当发现实例发生变化时,发布一个变化事件,并订户进行业务处理。updateinstanceupdateinstanceupdatelocalcache更新本地缓存的更新实例。上图nacos上展示了订阅方式的主要流程,涉及的内容很多,处理细节也比较复杂。这里我们只需要抓住最核心的部分。下面通过代码和流程图一步步分析上面的过程。从订阅到定时任务,我们这里说的订阅机制本质上是一种准实时感知的服务发现。上面我们已经看到,当订阅方法执行时,会触发一个定时任务,定时从服务器拉取数据。所以本质上,订阅机制是一种实现服务发现的方式,比较方式是直接查询接口。NacosNamingService暴露了很多重载的订阅。重载的目的就是让大家少写参数。Nacos默认处理这些参数。最后,这些重载的方法会调用下面的方法:,",");changeNotifier.registerListener(groupName,serviceName,clusterString,listener);clientProxy.subscribe(serviceName,groupName,clusterString);}方法中的事件监听我们暂时不说,直接看subscribe方法,其中clientProxy类型是NamingClientProxyDelegate。在实例化NacosNamingService时实例化该类。上一章已经提到,这里不再赘述。clientProxy.subscribe方法是在NamingClientProxyDelegate中实现的:.get(serviceKey);if(null==result){//如果为null,则进行订阅逻辑处理,基于gRPC协议result=grpcClientProxy.subscribe(serviceName,groupName,clusters);}//定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName,groupName,clusters);//ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);returnresult;}这个方法是不是很眼熟?我们已经谈过了。看似殊途同归,查询服务列表和订阅最终都调用了同一个方法。上一篇文章讲了其他的流程。这里重点关注任务调度://ServiceInfoUpdateServicepublicvoidscheduleUpdateIfAbsent(StringserviceName,StringgroupName,Stringclusters){StringserviceKey=ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName,groupName),clusters);if(futureMap.get(serviceKey)!=null){return;}synchronized(futureMap){if(futureMap.get(serviceKey)!=null){return;}//构建UpdateTaskScheduledFuturefuture=addTask(newUpdateTask(serviceName,groupName,clusters));futureMap.put(serviceKey,future);}}该方法包括构造serviceKey,通过serviceKey判断权重,最后加入UpdateTask。addTask的实现是发起一个定时任务://ServiceInfoUpdateServiceprivatesynchronizedScheduledFutureaddTask(UpdateTasktask){returnexecutor.schedule(task,DEFAULT_DELAY,TimeUnit.MILLISECONDS);}定时任务延时1秒执行。跟踪到此结束。核心功能只有两个:调用订阅方法和发起定时任务。定时任务做了什么?UpdateTask封装了订阅机制的核心业务逻辑。我们先通过一张流程图来看一下到底做了什么。有了上面的流程图,nacos就基本清楚UpdateTask是干什么的了。直接贴run方法的所有代码:publicvoidrun(){longdelayTime=DEFAULT_DELAY;try{//判断注册的Service是否订阅,如果没有订阅,不再执行if(!changeNotifier.isSubscribed(groupName,serviceName,clusters)&&!futureMap.containsKey(serviceKey)){NAMING_LOGGER.info("updatetaskisstopped,service:"+groupedServiceName+",clusters:"+clusters);return;}//获取缓存的服务信息ServiceInfoserviceObj=serviceInfoHolder.getServiceInfoMap().get(serviceKey);if(serviceObj==null){//根据serviceName,从注册服务器获取Service信息serviceObj=namingClientProxy.queryInstancesOfService(serviceName,groupName,clusters,0,false);serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime=serviceObj.getLastRefTime();return;}//过期服务(服务最新更新时间小于等于缓存刷新时间),重新从注册中心查询if(serviceObj.getLastRefTime()<=lastRefTime){serviceObj=namingClientProxy.queryInstancesOfService(serviceName,groupName,clusters,0,false);//处理服务消息serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime=serviceObj.getLastRefTime();if(CollectionUtils.isEmpty(serviceObj.getHosts())){incFailCount();return;}//下次更新缓存时间设置,默认6秒//TODOmultipletime可配置.delayTime=serviceObj.getCacheMillis()*DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;//重置失败的次数为0refresh,下次执行的时间与failCount相关//failCount=0,下次调度时间为6秒,最多1分钟//即在没有异常执行者的情况下,缓存实例的刷新时间为6秒。schedule(this,Math.min(delayTime<eventListeners=listenerMap.get(key);returnCollectionUtils.isNotEmpty(eventListeners);}查看该方法的源码会发现这里的listenerMap是原来的subscribe方法RegisterListener注册的EventListenerrun方法背后的业务处理基本相同。首先判断缓存中是否有ServiceInfo信息。如果不是,查询注册中心,处理ServiceInfo,更新最后处理时间。接下来判断ServiceInfo是否无效,就是通过比较当前ServiceInfo中的“最后更新时间”和“最后更新时间”来判断。如果失败,还会进行查询注册中心、处理ServiceInfo、更新上次处理时间等一系列操作。业务逻辑最终会计算出下一个定时任务的执行时间,通过delayTime延迟执行。delayTime默认为1000L*6,即6秒。最后它真的启动了下一个定时任务。发生异常时,下次执行时间与失败次数有关,但最长不超过1分钟。小结在这篇文章中,我们讲到了Nacos客户端服务订阅机制的源码,主要包括以下步骤:第一步:调用订阅方法,注册EventListener,然后使用UpdateTask进行判断;第二步:通过委托代理类处理订阅逻辑。这里与获取实例列表的方法相同;第三步:通过定时任务执行UpdateTask方法。默认执行间隔为6秒。当出现异常时,会延长,但不会超过1分钟;第四步:UpdateTask方法会比较本地是否有缓存,缓存是否过期。不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。第五步:重新计算定时任务时间,循环执行上述过程。