当前位置: 首页 > 科技观察

Nacos和Apollo的长轮询定时机制就是这么好用~

时间:2023-03-13 13:44:19 科技观察

www.ydisp.cn/oss/202207/14/979122c877ff4de792289561be60142173cf01.jpg"alt="picture"title="picture"style="width:640px;能见度:可见;height:217px;"data-type="block">ConfigService是Nacos客户端提供的访问和实现配置中心基本操作的类,我们从ConfigService1的实例化开始,开始长轮询定时机制。客户端的长轮询定时机制,我们从NacosPropertySourceLocator.locate()开始[断点步进]:1.1使用反射机制实例化NacosConfigService对象客户端的长轮询定时任务在NacosFactory.createConfigService()方法中,启动在构建ConfigService对象实例时,继续1.1的源码;进入NacosFactory.createConfigService():publicstaticConfigServicecreateConfigService(Propertiesproperties)throwsNacosException{//【断点步骤】创建ConfigServicereturnConfigFactory.createConfigService(properties);}进入ConfigFactory.createConfigService(),发现它使用了反射机制来实例化NacosConfigService目的;1.2在NacosConfigService的构造方法中,启动长轮询定时任务,进入NacosConfigService.NacosConfigService()构造方法,里面设置了一些比较远程的Task相关的属性;1.2.1初始化HttpAgentMetricsHttpAgent类设计如下:ServerHttpAgent类设计如下:1.2.2初始化ClientWorker进入ClientWorkerker.ClientWorker()的构造方法主要是创建两个定时线程池,启动一个定时任务;进入ClientWorker.checkConfigInfo()每隔10s检查配置是否有变化;cacheMap:是一个AtomicReference>对象,用于存放监控变化的缓存集合,key是根据datalD/group/tenant(租户)拼接的value,Value是其中的内容Nacos服务器上存储的相应配置文件;长轮询任务拆分:默认情况下,每个LongPollingRunnable任务处理3000个监听器配置集。如果超过3000个,则需要启动多个LongPollingRunnables来执行;1.3检查配置变更,读取变更后的配置。该方法的主要逻辑是:根据taskld拆分cacheMap数据;然后使用checkLocalConfig()方法比较本地配置文件(在${user}\nacos\config\)中的数据是否有变化,如果有变化,直接触发通知;publicvoidrun(){ListcacheDatas=newArrayList();ArrayListinInitializingCacheList=newArrayList();try{//遍历CacheData,检查本地配置Iteratorvar3=((Map)ClientWorker.this.cacheMap.get()).values().iterator();while(var3.hasNext()){CacheDatacacheData=(CacheData)var3.next();if(cacheData.getTaskId()==this.taskId){cacheDatas.add(cacheData);try{//检查本地配置ClientWorker.this.checkLocalConfig(cacheData);if(cacheData.isUseLocalConfigInfo()){cacheData.checkListenerMd5();}}catch(Exceptionvar13){ClientWorker.LOGGER.error("获取本地配置信息错误",var13);}}}//【断点进入1.3.1】通过长轮询请求是否检查到server对应的配置ChangesoccurredListchangedGroupKeys=ClientWorker.this.checkUpdateDataIds(cacheDatas,inInitializingCacheList);//遍历变化的groupKeys并重新加载最新的数据Iteratorvar16=changedGroupKeys.iterator();while(var16.hasNext()){StringgroupKey=(String)var16.next();String[]key=GroupKey.parseKey(groupKey);字符串dataId=key[0];字符串组=键[1];字符串租户=空;如果(key.length==3){tenant=key[2];}try{//【断点进入1.3.2】读取变更配置,其中dataId、group、tenant为【1.3.1】中获取的Stringcontent=ClientWorker.this.getServerConfig(dataId,group,tenant,3000L);CacheData缓存=(CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId,group,tenant));cache.setContent(内容);ClientWorker.LOGGER.info("[{}][数据接收]dataId={},group={},tenant={},md5={},content={}",newObject[]{ClientWorker.this.agent.getName(),dataId,group,tenant,cache.getMd5(),ContentUtils.truncateContent(content)});}catch(NacosExceptionvar12){Stringmessage=String.format("[%s][get-update]getchangedconfigexception.dataId=%s,group=%s,tenant=%s",ClientWorker.this.agent.getName(),dataId,组,租户);ClientWorker.LOGGER.error(message,var12);}}//触发事件通知var16=cacheDatas.iterator();while(true){CacheData缓存数据;做{如果(!var16.hasNext()){inInitializingCacheList.clear();//继续指定时间执行当前线程序ClientWorker.this.executorService.execute(this);返回;}cacheDatax=(CacheData)var16.next();}while(cacheDatax.isInitializing()&&!inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId,cacheDatax.group,cacheDatax.tenant)));cacheDatax.checkListenerMd5();cacheDatax.setInitializing(false);}}catch(Throwablevar14){ClientWorker.LOGGER.error("longPollingerror:",var14);ClientWorker.this.executorService。计划(这,(长)ClientWorker.this.taskPenaltyTime,TimeUnit.MILLISECONDS);}}注意:这里的断点需要在Nacos服务器上修改(间隔大于30s),进入后容易理解;1.3.1检查配置更改ClientWorker.checkUpdateDataIds(),我们点进ClientWorker.checkUpdateDataIds()方法,发现最后调用了ClientWorker.checkUpdateConfigStr()方法。实现逻辑和源码如下:通过MetricsHttpAgent.httpPost()方法(上面1.2.1中有提到)调用/v1/cs/configs/listener接口实现长轮询请求;长轮询请求只是在实现层面设置了比较长的超时时间,默认是30s;如果服务器上的数据发生变化,客户端会收到一个Http结果,服务端返回DataID、Group、Tenant,数据发生变化;获取到这些信息后,在LongPollingRunnable.run()方法中调用getServerConfig(),读取Nacos服务器上的具体配置内容;ListcheckUpdateConfigStr(StringprobeUpdateString,booleanisInitializingCacheList)throwsIOException{Listparams=Arrays.asList("Listening-Configs",probeUpdateString);}列表headers=newArrayList(2);headers.add("长拉超时");headers.add(""+this.timeout);if(isInitializingCacheList){headers.add("长拉超时-不挂断");headers.add("true");}if(StringUtils.isBlank(probeUpdateString)){returnCollections.emptyList();}else{try{//调用/v1/cs/configs/listener接口实现长轮询请求,返回的HttpResult包含数据变化的DataID、Group、TenantHttpResultresult=this.agent.httpPost("/v1/cs/configs/listener",headers,params,this.agent.getEncode(),this.timeout);如果(200==结果代码){this.setHealthServer(true);//返回this.parseUpdateDataIdResponse(result.content);}this.setHealthServer(false);LOGGER.error("[{}][check-update]getchangeddataIderror,code:{}",this.agent.getName(),result.code);}catch(IOExceptionvar6){this.setHealthServer(false);LOGGER.error("["+this.agent.getName()+"][check-update]getchangeddataIdexception",var6);抛出var6;}返回Collections.emptyList();}}1.3.2读取修改后的配置ClientWorker.getServerConfig()进入ClientWorker.getServerConfig()方法;读取服务器更改配置;最后调用MetricsHttpAgent.httpGet()方法(上文1.2.1中提到),调用/v1/cs/configs接口获取配置;然后将更改的配置保存到本地;2.服务端的长轮询定时机制2.1服务端收到请求ConfigController.listener()Nacos客户端通过HTTP协议与服务端通信,所以服务端源码中必须有相应接口的实现;nacos-config模块下的controller包提供了一个ConfigController类来处理请求,它有一个/listener接口,是客户端发起数据监控的接口。其主要逻辑及源码如下:获取客户端需要监控的可能发生变化的配置,计算MD5值;ConfigServletInner.doPollingConfig()开始执行长轮询请求;2.2执行长轮询请求ConfigServletInner.doPollingConfig()进入ConfigServletInner.doPollingConfig()方法,封装了长轮询的实现逻辑,兼容短轮询逻辑;进入LongPollingService.addLongPollingClient()方法,其中包含了长轮询的核心处理逻辑,主要作用是将客户端的长轮询请求封装成ClientPolling,交给调度器执行;2.3创建线程执行定时任务ClientLongPolling.run()我们找到了ClientLongPolling.run()方法,它可以体现长轮询计时机制的核心原理,通俗地说就是:服务端收到请求后,它不会立即返回,如果没有变化,会延迟(30-0.5)s后将请求结果返回给客户端;这使得客户端和服务器在30s内建立连接2.4监听配置更改事件2.4.1监听LocalDataChangeEvent事件的实现当我们在Nacos服务器上或者通过API更改配置时,会发布一个LocalDataChangeEvent事件,该事件会被LongPollingService监听捕获;这里,LongPollingService为什么有监听功能,在1.3.1版本之后有一些变化:1.3.1之前:LongPollingService.onEvent();1.3.1之后:Subscriber.onEvent();Nacos1.3.1版本之前,是通过继承LongPollingServiceAbstractEventListener实现监听,重写onEvent()方法;1.3.2版本之后,通过构造订阅者,实现监听LocalDataChangeEvent事件,通过线程池执行DataChangeTask任务,效果是一样的;2.4.2监听事件后处理逻辑DataChangeTask。run()找到了DataChangeTask.run()方法,实现了这个线程任务3.源码结构图总结3.1客户端长轮询定时机制NacosPropertySourceLocator.locate():初始化ConfigService对象,定位配置;NacosConfigService.NacosConfigService():NacosConfigService的构造方法;Executors.newScheduledThreadPool():创建一个executor线程池;Executors.newScheduledThreadPool():创建一个executorService线程池;ClientWorker.checkConfigInfo():使用executor线程池检查配置是否发生变化;ClientWorker.checkLocalConfig():检查本地配置;ClientWorker.checkUpdateDataIds():检查server对应的configuration是否有变化;ClientWorker.getServerConfig():读取变化的配置MetricsHttpAgent.httpPost():调用/v1/cs/configs/listener接口实现长轮询请求;ClientWorker.checkUpdateConfigStr():检查server对应的配置是否变化;MetricsHttpAgent.httpGet():调用/v1/cs/configs接口获取配置;LongPollingRunnable.run():运行长轮询定时线程;MetricsHttpAgent.MetricsHttpAgent():初始化HttpAgent;ClientWorker.ClientWorker():初始化ClientWorker;NacosFactory.createConfigService():创建配置服务器;ConfigFactory.createConfigService():使用反射机制创建配置服务器;3.2服务端长轮询定时机制ConfigController.listener():服务端接收到请求;LongPollingService.addLongPollingClient():长轮询的核心处理逻辑,提前500ms返回响应;ClientLongPolling.run():长轮询计时机制的实现逻辑;Map.put():将ClientLongPolling实例本身添加到allSubs队列中;队列。remove():从allSubs队列中移除ClientLongPolling实例本身;MD5Util.compareMd5():比较数据的MD5值;LongPollingService.sendResponse():将改变后的结果通过response返回给客户端;ConfigExecutor.scheduleLongPolling():启动一个定时任务,延迟时间为29.5s;HttpServletRequest.getHeader():获取客户端设置的请求超时时间;MD5Util.compareMd5():将MD5与服务器数据进行比较;ConfigExecutor.executeLongPolling():创建一个ClientLongPolling线程来执行定时任务;MD5Util.getClientMd5Map():计算MD5值;ConfigServletInner.doPollingConfig():执行长轮询请求;3.3Nacos服务器配置变更事件监听Nacos服务器上的配置发生变更后,发布一个LocalDataChangeEvent事件;Subscriber.onEvent():监听LocalDataChangeEvent事件(1.3.2版本后);DataChangeTask.run():根据groupKey返回配置;ConfigExecutor.executeLongPolling():通过线程池执行DataChangeTask任务;