当前位置: 首页 > 后端技术 > Java

Nacos配置中心集群原理及源码分析

时间:2023-04-01 23:31:15 Java

Nacos作为配置中心,必须保证服务节点的高可用,那么Nacos是如何实现集群的呢?下图是Nacos集群的部署图。Nacos集群工作原理在以Nacos为配置中心的集群结构中,是一种去中心化节点的设计。由于没有主从节点,也没有选举机制,为了实现热备份,需要增加一个虚拟IP(VIP)。Nacos的数据存储分为Mysql数据库存储两部分。所有Nacos节点共享相同的数据。数据副本机制由Mysql自带的主从方案解决,保证数据的可靠性。每个节点的本地磁盘会保存全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}。在Nacos的设计中,Mysql是一个中央数据仓库。并且认为Mysql中的数据是绝对正确的。另外Nacos在启动的时候会把Mysql中的数据拷贝一份到本地磁盘。这种设计的好处是可以提高性能。当客户端需要请求某个配置项时,服务端会要求Ian从磁盘中读取相应的文件并返回,磁盘的读取效率高于数据库。当配置发生变化时:Nacos会将变化的配置保存到数据库中,然后写入到本地文件中。然后向集群中的其他节点发送HTTP请求。其他节点收到事件后,将刚刚从Mysql写入的数据转储到本地文件中。另外NacosServer启动后,会同步启动一个定时任务。每6小时,它会dump全量数据到本地文件配置变更同步入口。当修改、删除或添加配置时,将发出notifyConfigChange事件。@PostMapping@Secured(action=ActionTypes.WRITE,parser=ConfigResourceParser.class)publicBooleanpublishConfig(HttpServletRequest请求,HttpServletResponse响应,@RequestParam(value="dataId")StringdataId,@RequestParam(value="group")字符串组,@RequestParam(value="tenant",required=false,defaultValue=StringUtils.EMPTY)Stringtenant,@RequestParam(value="content")Stringcontent,@RequestParam(value="tag",required=false)字符串标签,@RequestParam(value="appName",required=false)StringappName,@RequestParam(value="src_user",required=false)StringsrcUser,@RequestParam(value="config_tags",required=false)字符串configTags,@RequestParam(value="desc",required=false)Stringdesc,@RequestParam(value="use",required=false)Stringuse,@RequestParam(value="effect",required=false)String效果,@RequestParam(value="type",required=false)String类型,@RequestParam(value="schema",required=false)Stringschema)throwsNacosException{//省略..if(StringUtils.isBlank(betaIps)){if(StringUtils.isBlank(tag)){persistService.insertOrUpdate(srcIp,srcUser,configInfo,time,configAdvanceInfo,true);ConfigChangePublisher.notifyConfigChange(newConfigDataChangeEvent(false,dataId,group,tenant,time.getTime()));}else{persistService.insertOrUpdateTag(configInfo,tag,srcIp,srcUser,time,true);ConfigChangePublisher.notifyConfigChange(newConfigDataChangeEvent(false,dataId,group,tenant,tag,time.getTime()));}}//省略returntrue;}AsyncNotifyService配置数据变化事件,有专门的监听器AsyncNotifyService,处理数据变化后的同步事件@AutowiredpublicAsyncNotifyService(ServerMemberManagermemberManager){this.memberManager=memberManager;//将ConfigDataChangeEvent注册到NotifyCenter。NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class,NotifyCenter.ringBufferSize);//注册订阅者以订阅ConfigDataChangeEvent。NotifyCenter.registerSubscriber(newSubscriber(){@OverridepublicvoidonEvent(Eventevent){//并发生成ConfigDataChangeEventif(eventinstanceofConfigDataChangeEvent){ConfigDataChangeEventevt=(ConfigDataChangeEvent)event;longdumpTs=evt.lastModifiedTs;StringdataId=evt.dataId;Stringgroup=evt.group;Stringtenant=evt.tenant;Stringtag=evt.tag;CollectionipList=memberManager.allMembers();//得到群中的ip列表//构建NotifySingleTask并将其添加到队列中队列queue=newLinkedList();for(Membermember:ipList){//遍历集群中的每个节点queue.add(newNotifySingleTask(dataId,group,tenant,tag,dumpTs,member.getAddress(),evt.isBeta));}//异步任务执行AsyncTaskConfigExecutor.executeAsyncNotify(newAsyncTask(nacosAsyncRestTemplate,queue));}}@Override公共类subscribeType(){返回ConfigDataChangeEvent.class;}});}AsyncTask@Overridepublicvoidrun(){executeAsyncInvoke();}privatevoidexecuteAsyncInvoke(){while(!queue.isEmpty()){//遍历队列中的数据,直到数据为空NotifySingleTasktask=队列.poll();//获取任务StringtargetIp=task.getTargetIP();//获取目标ipif(memberManager.hasMember(targetIp)){//如果集群中的ip列表中包含目标ip//开始健康检查,有未被监控的ip,直接放入通知队列,否则通知//判断目标ip的健康状态booleanunHealthNeedDelay=memberManager.isUnHealth(targetIp);//if(unHealthNeedDelay){//如果目标服务不健康,则继续加入队列,延迟一段时间后再执行//目标ip不健康,则放入通知列表ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0,task.target);//获取延迟时间并将失败计数设置为任务asyncTaskExecute(task);}else{//构造headerHeaderheader=Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP,InetUtils.getSelfIP());如果(task.isBeta){header.addParam("isBeta","true");}AuthHeaderUtil.addIdentityToHeader(header);//通过restTemplate发起远程调用。如果调用成功,则执行AsyncNotifyCallBack的回调方法restTemplate.get(task.url,header,Query.EMPTY,String.class,newAsyncNotifyCallBack(task));}}}}目标节点收到请求数据同步请求地址为task.url=http://192.168.8.16:8848/naco...@GetMapping("/dataChange")publicBooleannotifyConfigInfo(HttpServletRequestrequest,@RequestParam("dataId")StringdataId,@RequestParam("group")String组,@RequestParam(value="tenant",required=false,defaultValue=StringUtils.EMPTY)Stringtenant,@RequestParam(value="tag",required=false)字符串标签){dataId=dataId.trim();group=group.trim();StringlastModified=request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);longlastModifiedTs=StringUtils.isEmpty(lastModified)?-1:Long.parseLong(lastModified);字符串handleIp=请求。getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);StringisBetaStr=request.getHeader("isBeta");如果(StringUtils.isNotBlank(isBetaStr)&&trueStr.equals(isBetaStr)){dumpService.dump(dataId,group,tenant,lastModifiedTs,handleIp,true);}else{//dumpService.dump(dataId,group,tenant,tag,lastModifiedTs,handleIp);}returntrue;}dumpService.dump用于更新配置,代码如下,当前任务会添加到DumpTaskMgr中管理publicvoiddump(StringdataId,Stringgroup,Stringtenant,Stringtag,longlastModified,StringhandleIp,booleanisBeta){StringgroupKey=GroupKey2.getKey(dataId,group,tenant);}StringtaskKey=String.join("+",dataId,group,tenant,String.valueOf(isBeta),tag);dumpTaskMgr.addTask(taskKey,newDumpTask(groupKey,tag,lastModified,handleIp,isBeta));DUMP_LOG.info([dump-task]addtask.groupKey={},taskKey={}",groupKey,taskKey);}TaskManager.addTask,先调用父类完成任务添加。@OverridepublicvoidaddTask(Objectkey,AbstractDelayTasknewTask){super.addTask(key,newTask);MetricsMonitor.getDumpTaskMonitor().set(tasks.size());}在这个场景设计中,producer消费一般都是以operator的方式来完成的,所以这里不难猜到task会被保存在队列中,然后由另一个线程执行。NacosDelayTaskExecuteEngineTaskManager的父类是NacosDelayTaskExecuteEngine。该类有一个成员属性protectedfinalConcurrentHashMaptasks;,专门用来保存延时任务类型AbstractDelayTask。在该类的构造函数中,初始化了一个延迟任务,具体任务为ProcessRunnable.publicNacosDelayTaskExecuteEngine(Stringname,intinitCapacity,Loggerlogger,longprocessInterval){super(logger);tasks=newConcurrentHashMap(initCapacity);processingExecutor=ExecutorFactory.newSingleThreadScheduled((名称));processingExecutor.scheduleWithFixedDelay(newProcessRunnable(),processInterval,processInterval,TimeUnit.MILLISECONDS);}ProcessRunnableprivateclassProcessRunnableimplementsRunnable{@Overridepublicvoidrun(){try{processTasks();}}catch(Throwablee){getEngineLog().error(e.toString(),e);}}}processTasksprotectedvoidprocessTasks(){//获取所有任务Collectionkeys=getAllTask??Keys();for(ObjecttaskKey:keys){AbstractDelayTasktask=removeTask(taskKey);如果(空==任务){继续;}//获取Task处理器,这里返回DumpProcessorNacosTaskProcessorprocessor=getProcessor(taskKey);if(null==processor){getEngineLog().error("找不到任务的处理器,因此被丢弃。"+task);继续;}try{//ReAddtaskifprocessfailed//执行特定任务if(!processor.process(task)){retryFailedTask(taskKey,task);}}catch(Throwablee){getEngineLog().error("Nacos任务执行错误:"+e.toString(),e);retryFailedTask(taskKey,任务);}}}DumpProcessor.process从数据库中读取最新数据,然后更新本地缓存和磁盘。两者都在CCBY-NC-SA4.0下获得许可。转载请注明来自Mic带你学建筑!如果本文对您有帮助,请给个关注和点赞。您的坚持是我不断创作的动力。欢迎关注同名微信公众号获取更多技术干货!