当前位置: 首页 > 科技观察

Nacos和Apollo中的长轮询定时机制很有用

时间:2023-03-16 11:43:35 科技观察

今天本文介绍Nacos配置中心的其中一个原理:长轮询机制的应用方便理解和表达,这里我们使用Nacos控制台Nacos注册中心称为Nacos服务器(即web界面),我们编写的业务服务称为Nacso客户端;Nacos动态监控长轮询机制示意图。本文将围绕这张图分析长轮询的定时机制原理:ConfigService是Nacos客户端提供的一个类,用于访问和实现配置中心的基本操作。我们将从ConfigService的实例化开始长轮询计时机制的源码之旅;1、客户端的长轮询定时机制我们从NacosPropertySourceLocator.locate()开始【断点介入】:1.1使用反射机制实例化NacosConfigService对象。客户端的长轮询定时任务是在NacosFactory.createConfigService()方法中构造ConfigService对象实例时启动的。进入1.1进入NacosFactory.createConfigService()源码:publicstaticConfigServicecreateConfigService(Propertiesproperties)throwsNacosException{//【断点步进】创建ConfigServicereturnConfigFactory.createConfigService(properties);}进入ConfigFactory.createConfigService()and找到其使用反射机制实例化NacosConfigService对象;1.2在NacosConfigService的构造方法中,启动长轮询定时任务,进入NacosConfigService.NacosConfigService()构造方法,设置一些与远程任务相关的属性;1.2.1初始化HttpAgentMetricsHttpAgent类设计如下:ServerHttpAgent类的设计如下:1.2.2初始化ClientWorker,进入ClientWorker.ClientWorker()构造方法主要是创建两个线程池进行定时调度,并启动一个定时任务;进入ClientWorker.checkConfigInfo()每隔10s检查配置是否有变化;cacheMap:是一个AtomicReference>对象,用于存放监控变化的缓存集合,key是根据datalD/group/tenant(租户)拼接的value,Value是内容Nacos服务器上存储的相应配置文件;长轮询任务拆分:默认情况下,每个LongPollingRunnable任务处理3000个监听器配置集。如果超过3000个,则需要启动多个LongPollingRunnables来执行;1.3检查配置变更,读取变更后的配置。该方法的主要逻辑是:根据taskld拆分cacheMap数据;然后使用checkLocalConfig()方法比较本地配置文件(在${user}\nacos\config\)中的数据是否有变化,如果有变化,则直接触发Notification;publicvoidrun(){ListcacheDatas=newArrayList();ArrayListinInitializingCacheList=newArrayList();try{//遍历CacheData,检查本地配置Iteratorvar3=((Map)ClientWorker.this.cacheMap.get()).values().iterator();while(var3.hasNext()){CacheDatacacheData=(CacheData)var3.next();if(cacheData.getTaskId()==this.taskId){缓存数据。添加(缓存数据);try{//检查本地配置ClientWorker.this.checkLocalConfig(cacheData);if(cacheData.isUseLocalConfigInfo()){cacheData.checkListenerMd5();}}catch(Exceptionvar13){ClientWorker.LOGGER.error("获取本地配置信息错误",var13);}}}//【断点进入1.3.1】通过长轮询请求是否检查到server对应的配置ChangesoccurredListchangedGroupKeys=ClientWorker.this.checkUpdateDataIds(cacheDatas,inInitializingCacheList);//遍历变化的groupKeys并重新加载最新的数据Iteratorvar16=changedGroupKeys.iterator();while(var16.hasNext()){StringgroupKey=(String)var16.next();String[]key=GroupKey.parseKey(groupKey);字符串dataId=key[0];字符串组=键[1];字符串租户=空;如果(key.length==3){tenant=key[2];}try{//【断点进入1.3.2】读取变更配置,其中dataId、group、tenant为【1.3.1】中获取的Stringcontent=ClientWorker.this.getServerConfig(dataId,group,tenant,3000L);CacheData缓存=(CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId,group,tenant));cache.setContent(内容);ClientWorker.LOGGER.info("[{}][数据接收]dataId={},group={},tenant={},md5={},content={}",newObject[]{ClientWorker.this.agent.getName(),dataId,group,tenant,cache.getMd5(),ContentUtils.truncateContent(content)});}catch(NacosExceptionvar12){Stringmessage=String.format("[%s][get-update]getchangedconfigexception.dataId=%s,group=%s,tenant=%s",ClientWorker.this.agent.getName(),dataId,组,租户);ClientWorker.LOGGER.error(message,var12);}}//触发事件通知var16=cacheDatas.iterator();while(true){CacheData缓存数据;做{如果(!var16.hasNext()){inInitializingCacheList.clear();//继续指定时间执行当前线程序ClientWorker.this.executorService.execute(this);返回;}cacheDatax=(CacheData)var16.next();}while(cacheDatax.isInitializing()&&!inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId,cacheDatax.group,cacheDatax.tenant)));cacheDatax.checkListenerMd5();cacheDatax.setInitializing(false);}}catch(Throwablevar14){ClientWorker.LOGGER.error("longPollingerror:",var14);ClientWorker.this.executorService。计划(这,(长)ClientWorker.this.taskPenaltyTime,TimeUnit.MILLISECONDS);}}注意:这里的断点需要在Nacos服务器上修改(间隔大于30s),进入后容易理解;1.3.1检查配置更改ClientWorker.checkUpdateDataIds(),我们点进ClientWorker.checkUpdateDataIds()方法,发现最后调用了ClientWorker.checkUpdateConfigStr()方法。实现逻辑和源码如下:通过MetricsHttpAgent.httpPost()方法(上面1.2.1有提到)调用/v1/cs/configs/listener接口实现长轮询请求;长轮询请求只是在实现层面设置了比较长的超时时间,默认是30s;如果服务器上的数据发生变化,客户端将收到一个HttpResult,服务端返回DataID、Group、Tenant数据变化;获取到这些信息后,在LongPollingRunnable.run()方法中调用getServerConfig(),读取Nacos服务器上的具体配置内容;ListcheckUpdateConfigStr(StringprobeUpdateString,booleanisInitializingCacheList)throwsIOException{Listparams=Arrays.asList("Listening-Configs",probeUpdateString);}列表headers=newArrayList(2);headers.add("长拉超时");headers.add(""+this.timeout);if(isInitializingCacheList){headers.add("长拉超时-不挂断");headers.add("true");}if(StringUtils.isBlank(probeUpdateString)){returnCollections.emptyList();}else{try{//调用/v1/cs/configs/listener接口实现长轮询请求,返回的HttpResult包含数据变化的DataID、Group、TenantHttpResultresult=this.agent.httpPost("/v1/cs/configs/listener",headers,params,this.agent.getEncode(),this.timeout);如果(200==结果代码){this.setHealthServer(true);//返回this.parseUpdateDataIdResponse(result.content);}this.setHealthServer(false);LOGGER.error("[{}][check-update]getchangeddataIderror,code:{}",this.agent.getName(),result.code);}catch(IOExceptionvar6){this.setHealthServer(false);LOGGER.error("["+this.agent.getName()+"][check-update]getchangeddataIdexception",var6);抛出var6;}返回Collections.emptyList();}}1.3.2读取修改后的配置ClientWorker.getServerConfig()进入ClientWorker.getServerConfig()方法;读取服务器配置的变化;最后调用的是MetricsHttpAgent.httpGet()方法(上文1.2.1中提到),调用/v1/cs/configs接口获取配置;然后通过调用LocalConfigInfoProcessor.saveSnapshot()将更改的配置保存在本地;2.服务端的长轮询定时机制2.1服务端收到请求ConfigController.listener()Nacos客户端通过HTTP协议与服务端通信,所以服务端源码中必须有相应接口的实现;nacos-config模块下的controller包提供了一个ConfigController类,用来处理请求,还有一个/listener接口,由客户端发送数据监控接口,其主要逻辑和源码如下:获取客户端需要监控的可能变化的配置,计算MD5值;ConfigServletInner.doPollingConfig()开始执行长轮询请求;2.2执行长轮询请求ConfigServletInner。doPollingConfig()进入ConfigServletInner.doPollingConfig()方法,封装了长轮询的实现逻辑,兼容短轮询逻辑;进入LongPollingService.addLongPollingClient()方法,其中包含了长轮询的核心处理逻辑,主要作用是将客户端的长轮询请求封装成ClientPolling,交给调度器执行;2.3创建一个线程来执行定时任务ClientLongPolling.run()我们找到了ClientLongPolling.run()方法,这个方法可以体现长轮询计时机制的核心原理,通俗地说就是:服务端收到请求后,它不会立即返回,如果没有变化,会延迟(30-0.5)s后将请求结果返回给客户端;这使得客户端和服务器之间的数据在30秒内。有变化时,始终处于连接状态;2.4监听配置更改事件2.4.1监听LocalDataChangeEvent事件的实现当我们在Nacos服务端或者通过API更改配置时,会释放一个LocalDataChangeEvent事件,该事件会被LongPollingService监听;这里为什么LongPollingService有监听功能呢?1.3.1版本之后的一些变化:1.3.1之前:LongPollingService.onEvent();1.3.1之后:Subscriber.onEvent();Nacos1.3.1版本之前,LongPollingService继承AbstractEventListener实现监听,覆盖了onEvent()方法;1.3.2版本以后,通过构造订阅者,实现监听LocalDataChangeEvent事件,通过线程池执行DataChangeTask任务,效果是一样的;2.4.2监听事件逻辑DataChangeTask.run()后的处理我们找到了DataChangeTask.run()方法,实现了这个线程任务3.源码结构图总结3.1Client长轮询定时机制NacosPropertySourceLocator.locate():初始化ConfigService对象,定位配置;NacosConfigService.NacosConfigService():NacosConfigService的构造方法;Executors.newScheduledThreadPool():创建一个executor线程池;Executors.newScheduledThreadPool():创建一个executorService线程池;ClientWorker.checkConfigInfo():使用executor线程池检查配置是否发生变化;ClientWorker.checkLocalConfig():检查本地配置;ClientWorker.checkUpdateDataIds():检查server对应的configuration是否有变化;ClientWorker.getServerConfig():读取变化的配置MetricsHttpAgent.httpPost():调用/v1/cs/configs/listener接口实现长轮询请求;客户工作者。checkUpdateConfigStr():检查服务器对应的配置是否发生变化;MetricsHttpAgent.httpGet():调用/v1/cs/configs接口获取配置;LongPollingRunnable.run():运行长轮询定时线程;MetricsHttpAgent.MetricsHttpAgent():初始化HttpAgent;ClientWorker.ClientWorker():初始化ClientWorker;NacosFactory.createConfigService():创建配置服务器;ConfigFactory.createConfigService():使用反射机制创建配置服务器;3.2服务端长轮询定时机制ConfigController.listener():ServerReceiverequest;LongPollingService.addLongPollingCclient():长轮询的核心处理逻辑,提前500ms返回响应;ClientLongPolling.run():长轮询计时机制的实现逻辑;Map.put():将ClientLongPolling实例本身添加到allSubs队列中;队列。remove():从allSubs队列中移除ClientLongPolling实例本身;MD5Util.compareMd5():比较数据的MD5值;LongPollingService.sendResponse():将改变后的结果通过response返回给客户端;ConfigExecutor.scheduleLongPolling():启动一个定时任务,延迟时间为29.5s;HttpServletRequest.getHeader():获取客户端设置的请求超时时间;MD5Util.compareMd5():将MD5与服务器数据进行比较;ConfigExecutor.executeLongPolling():创建一个ClientLongPolling线程来执行定时任务;MD5Util.getClientMd5Map():计算MD5值;ConfigServletInner.doPollingConfig():执行长轮询请求;3.3Nacos服务器配置变更事件监听Nacos服务器上的配置发生变更后,发布一个LocalDataChangeEvent事件;Subscriber.onEvent():监听LocalDataChangeEvent事件(1.3.2版本后);DataChangeTask.run():根据groupKey返回配置;ConfigExecutor.executeLongPolling():通过线程池执行DataChangeTask任务;