当前位置: 首页 > 科技观察

NacosSync双向复制源码分析

时间:2023-03-11 22:27:17 科技观察

介绍通过对开源同步工具NacosSync的分析,为我们实现自定义同步工具提供了参考。文中分析了同步任务分发与Nacos集群之间的同步源码,从zk到Nacos。1.内容摘要任务和配置存储集群配置存储同步任务存储同步任务分发每三秒调度一个新的任务列表发布同步任务事件SyncTaskEvent并被listenerSyncTaskEvent处理删除任务发布删除任务事件DeleteTaskEvent并被listenerDeleteTaskEvent处理发布以及任务的订阅使用Guava的EventBusNacos集群同步逻辑实现两个Nacos集群之间的同步。同步任务在Service维度(AppId)中对源集群建立注册监控,获取注册节点列表。剔除无效节点后,将新注册到目标集群的节点从zk集群同步到Nacos集群NacosSync从zk集群到Nacos第一次只支持dubbo路径,先把所有节点同步过去,然后监听源集群路径变化,并同步到目标集群II。任务和配置输入库入库部分比较简单,只列出入口类和处理类。集群配置入库请求入口:ClusterApi#clusterAdd入库处理:ClusterAddProcessor#processclusterAccessService.insert(clusterDO);同步任务入库请求入口:TaskApi#taskAdd入库处理:TaskAccessService#addTasktaskAccessService.addTask(taskDO);3、同步任务分发同步任务已经入库,需要立即分发任务。代码转为QuerySyncTaskTimer来实现springboot的CommandLineRunner接口。定时任务调度publicvoidrun(String...args){scheduledExecutorService.scheduleWithFixedDelay(newCheckRunningStatusThread(),0,3000,TimeUnit.MILLISECONDS);}备注:定时任务每3秒调度一次。调度任务执行privateclassCheckRunningStatusThreadimplementsRunnable{@Overridepublicvoidrun(){Longstart=System.currentTimeMillis();try{//注解@1IterabletaskDOS=taskAccessService.findAll();taskDOS.forEach(taskDO->{//注解@2if((null!=skyWalkerCacheServices.getFinishedTask(taskDO))){return;}//注意@3if(TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())){eventBus.post(newSyncTaskEvent(taskDO));log.info("从数据库中查询一个同步任务,发送一个同步事件:"+taskDO);}//注意@4if(TaskStatusEnum.DELETE.getCode().equals(taskDO.getTaskStatus())){eventBus.post(newDeleteTaskEvent(taskDO));log.info("从数据库中查询删除任务并发出同步事件:"+taskDO);}});}catch(Exceptione){log.warn("CheckRunningStatusThreadException",e);}//注解@5metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK,System.currentTimeMillis()-start);}}注解@1查询所有同步任务注解@2过滤comp让任务注解@3发布同步任务事件SyncTaskEvent注解@4发布删除任务事件DeleteTaskEvent注解@5使用metric统计本次定时任务执行的耗时情况总结:当有新任务或删除任务时,通过Guava的EventBus发布一个同步事件或删除事件,每3秒检测一次4.同步事件处理代码EventListener#listenerSyncTaskEvent订阅同步事件SyncTaskEvent。@SubscribepublicvoidlistenerSyncTaskEvent(SyncTaskEventsyncTaskEvent){try{longstart=System.currentTimeMillis();//注意@6if(syncManagerService.sync(syncTaskEvent.getTaskDO())){//注意@7skyWalkerCacheServices.addFinishedTask(syncTaskDO()));TaskEvent(/Note@8metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT,System.currentTimeMillis()-start);}else{log.warn("listenerSyncTaskEventsyncfailure");}}catch(Exceptione){log.warn("listenerSyncTaskEventprocesserror",e);}}注@6执行同步任务注@7标记同步任务完成注@8记录任务执行时间代码EventListener#listenerDeleteTaskEvent订阅删除任务事件DeleteTaskEvent。@SubscribepublicvoidlistenerDeleteTaskEvent(DeleteTaskEventdeleteTaskEvent){try{longstart=System.currentTimeMillis();if(syncManagerService.delete(deleteTaskEvent.getTaskDO())){skyWalkerCacheServices.addFinishedTask(deleteTaskEvent.getTaskDO());metricsManager.record(MetricsStatisticsType.DELETE_TASK_RT.currentTimeMillis()-start);}else{log.warn("listenerDeleteTaskEventdeletefailure");}}catch(Exceptione){log.warn("listenerDeleteTaskEventprocesserror",e);}}总结:listenerSyncTaskEvent和listenerDeleteTaskEvent的代码结构为一致,执行Task逻辑,执行完缓存的已完成任务,最后记录耗时情况。5.Nacos集群间的同步逻辑首先看Nacos集群间的同步。代码在NacosSyncToNacosServiceImpl#sync部分。执行同步通讯@Overridepublicbooleansync(TaskDOtaskDO){StringtaskId=taskDO.getTaskId();try{//注解@7NamingServicesourceNamingService=nacosServerHolder.get(taskDO.getSourceClusterId(),taskDO.getNameSpace());//注解@8NamingHoldService=cosDestNamingget(taskDO.getDestClusterId(),taskDO.getNameSpace());this.listenerMap.putIfAbsent(taskId,event->{if(eventinstanceofNamingEvent){try{//注解@9ListsourceInstances=sourceNamingService.getAllInstances(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),newArrayList<>(),true);//注解@10this.removeInvalidInstance(taskDO,destNamingService,sourceInstances);//注解@11if(sourceInstances.isEmpty()){sourceInstanceSnapshot.remove(taskId);return;}//注解@12this.syncNewInstance(taskDO,destNamingService,sourceInstances);}catch(Exceptione){log.error("eventprocessfail,taskId:{}",taskId,e);metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);}}});sourceNamingService.subscribe(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),listenerMap.get(taskId));}catch(Exceptione){log.error("synctaskfromnacostonacoswasfailed,taskId:{}",taskId,e);metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);returnfalse;}returntrue;}注@7创建源集群NameService注解@8创建目标集群NameService注解@9获取服务注册实例注@10删除无效nodefirstprivatevoidremoveInvalidInstance(TaskDOtaskDO,NamingServicedestNamingService,ListsourceInstances)throwsNacosException{StringtaskId=taskDO.getTaskId();if(this.sourceInstanceSnapshot.containsKey(taskId)){//注@10.1SetoldInstanceKeys=this.sourceInstanceSnapshot.get(taskId);ListnewInstanceKeys=sourceInstances.stream().map(this::composeInstanceKey).collect(Collectors.toList());//注解@10.2CollectioninstanceKeys=Collections.subtract(旧实例键、新实例键);for(StringinstanceKey:instanceKeys){log.info("TaskId:{},移除无效同步实例:{}",taskId,instanceKey);String[]split=instanceKey.split(":",-1);//注@10.3destNamingService.deregisterInstance(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),split[0],Integer.parseInt(split[1]));}}}注@10.1缓存旧节点信息注@10.2从旧节点中剥离无效节点注@10.3取消无效节点注@11如果同步实例一直为Empty则表示所有实例的服务已经下线,清除本地快照注解@12将新实例同步到目标集群并更新缓存>();//添加新实例StringtaskId=taskDO.getTaskId();//注解@12.1SetinstanceKeys=sourceInstanceSnapshot.get(taskId);//注解@12.2for(Instanceinstance:sourceInstances){if(needSync(instance.getMetadata())){StringinstanceKey=composeInstanceKey(instance);//注解@12.3if(CollectionUtils.isEmpty(instanceKeys)||!instanceKeys.contains(instanceKey)){destNamingService.registerInstance(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),buildSyncInstance(instance,taskDO));}//注意@12.4latestSyncInstance.add(instanceKey);}}if(CollectionUtils.isNotEmpty(latestSyncInstance)){log.info("TaskId:{},同步实例数:{}",taskId,latestSyncInstance.size());//注解@12.5sourceInstanceSnapshot.put(taskId,latestSyncInstance);}}注解@12.1缓存旧节点信息注解@12.2遍历新节点信息注解@12.3新节点时if信息不为空,旧节点不存在,注册到目标集群注@12.4收集新节点注@12.5更新缓存节点信息总结:在两个Nacos集群之间同步,同步任务建立在Service维度(AppId)通过Register监听源集群获取已注册节点列表,剔除无效节点后向目标集群注册新节点。执行执行执行删除任务逻辑代码nacossynctonacosserviceimpl#删除部分publicBooleanDelete(taskdotaskdo){try{namingServicesourcenamingService=nacosserervice=nacosserverholder.get(taskso.getso.getSoggetSoggetSoveSourcecLusterId(taskSourceclusterid).getNameSpace());//注解@13sourceNamingService.unsubscribe(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),listenerMap.remove(taskDO.getTaskId()));sourceInstanceSnapshot.remove(taskDO.getTaskId());//注解@14ListsourceInstances=sourceNamingService.getAllInstances(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),newArrayList<>(),false);for(Instanceinstance:sourceInstances){if(needSync(instance.getMetadata())){//注册操作destNamingService.deregisterInstance(taskDO.getServiceName(),getGroupNameOrDefault(taskDO.getGroupName()),instance.getIp(),instance.getPort());}}}catch(Exceptione){log.error("deletetaskfromnacostonacoswasfailed,taskId:{}",taskDO.getTaskId(),e);metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);returnfalse;}returntrue;}注解@13去掉任务(服务)源集群订阅注解@14去掉target集群中同步的实例列表汇总:删除逻辑比较简单,取消源集群的订阅,移除目标集群的注册节点6.从zk集群同步到Nacos集群再看从zk集群到Nacos集群的同步情况。代码转ZookeeperSyncToNacosServiceImpl#sync()@Overridepublicbooleansync(TaskDOtaskDO){try{if(treeCacheMap.containsKey(taskDO.getTaskId())){returntrue;}//注解@1TreeCachetreeCache=getTreeCache(taskDO);//注解@2NamingServicedestNamingService=nacosServerHolder.get(taskDO.getDestClusterId(),null);//注解@3registerAllInstances(taskDO,destNamingService);//注解@4Objects.requireNonNull(treeCache).getListenable().addListener((client,event)->{try{Stringpath=event.getData().getPath();MapqueryParam=parseQueryString(path);if(isMatch(taskDO,queryParam)&&needSync(queryParam)){processEvent(taskDO,destNamingService,event,path,queryParam);}}catch(Exceptione){//...}});}catch(Exceptione){//...metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);returnfalse;}returntrue;}注意@1监视器zk源集群路径为“/dubbo”注@2目标Nacos集群搭建注@3初始执行任务,注册所有实例privatevoidregisterAll实例(TaskDOtaskDO,NamingServicedestNamingService)通过wsException{CuratorFrameworkzk=zookeeperServerHolder.get(taskDO.getSourceClusterId(),"");//注意@3.1if(!ALL_SERVICE_NAME_PATTERN.equals(taskDO.getServiceName())){registerALLInstances0(taskDO,destNamingService,zk,taskDO.getServiceName());}else{//注解@3.2ListserviceList=zk.getChildren().forPath(DUBBO_ROOT_PATH);for(StringserviceName:serviceList){registerALLInstances0(taskDO,destNamingService,zk,serviceName);}}}注解@3.1同步特定服务注册节点(Dubbo)Notes@3.2将所有zk节点同步到NacosNotes@4注册zk监听监听新的和更新的同步privatevoidprocessEvent(TaskDOtaskDO,NamingServicedestNamingService,TreeCacheEventevent,Stringpath,MapqueryParam)throwsNacosException{if(!com.alibaba.nacossync.util.StringUtils.isDubboProviderPath(path)){return;}MapipAndPortParam=parseIpAndPortString(path);Instanceinstance=buildSyncInstance(queryParam,ipAndPortParam,taskDO);StringserviceName=queryParam.get(INTERFACE_KEY);switch(event.getType()){caseNODE_ADDED:caseNODE_UPDATED://note@4.1destNamingService.registerInstance(getServiceNameFromCache(serviceName,queryParam),instance);break;caseNODE_REMOVED://note@4.2destNamingService.deregisterInstance(getServiceNameFromCache(serviceName,queryParam),ipAndPortParam.get(INSTANCE_IP_KEY),整数parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));nacosServiceNameMap.remove(serviceName);break;default:break;}}注@4.1同步节点添加更新到目标集群注@4.2源集群节点删除同步注销目标集群总结:NacosSync只支持zk集群到Nacos的dubbo路径。可以参考二次改造的基础上先同步所有节点,然后监听源集群的路径变化同步到目标集群。本文转载自微信公众号“瓜农老粮”,可通过以下二维码关注。转载本文请联系瓜农老梁公众号。