当前位置: 首页 > 科技观察

Nacos客服订阅事件机制解析

时间:2023-03-19 11:47:03 科技观察

转载请联系程序新石公众号。学习不需要那么功利。二哥从更高的维度带你轻松阅读源码。上一篇我们分析了Nacos客户端订阅的核心流程:Nacos客户端通过定时任务每6秒从注册中心获取实例列表。当发现实例发生变化时,发布变化事件,订阅者进行业务处理,然后更新内存和本地缓存中的实例。本文为服务订阅的第二篇文章。我们着重分析整个事件机制在获取到最新的实例列表后,是如何处理定时任务的。回顾整个过程首先回顾一下客户端订阅的基本过程:第一步调用subscribe方法时,会订阅一个EventListener事件。定时任务UpdateTask定时获取实例列表后,会调用ServiceInfoHolder#processServiceInfo方法在本地处理ServiceInfo,包括事件处理。监听事件的注册在subscribe方法中的注册方式如下:@Overridepublicvoidsubscribe(StringserviceName,StringgroupName,Listclusters,EventListenerlistener)throwsNacosException{if(null==listener){return;}StringclusterString=StringUtils.join(clusters,",");changeNotifier.registerListener(groupName,serviceName,clusterString,listener);clientProxy.subscribe(serviceName,groupName,clusterString);}这里的changeNotifier.registerListener就是具体的事件注册逻辑。追进去看看实现源码://InstancesChangeNotifierpublicvoidregisterListener(StringgroupName,StringserviceName,Stringclusters,EventListenerlistener){Stringkey=ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName,groupName),clusters);ConcurrentHashSeteventListeners=listenerMap.get(key);if(eventListeners==null){synchronized(lock){eventListeners=listenerMap.get(key);if(eventListeners==null){eventListeners=newConcurrentHashSet();//缓存EventListenertolistenerMaplistenerMap.put(key,eventListeners);}}}eventListeners.add(listener);}可以看出事件的注册是将EventListener存放在InstancesChangeNotifier的listenerMap属性中。这里的数据结构是一个Map,key是服务实例信息的拼接,value是监听事件的集合。活动注册过程就这么简单。这是双重检查锁的实际案例。不知道大家有没有注意到呢?你可以学习它。ServiceInfo的处理完成了事件的注册,现在我们可以追溯触发事件的来源了。UpdateTask中获取的最新实例会进行本地化,部分代码如下://获取缓存的服务信息ServiceInfoserviceObj=serviceInfoHolder.getServiceInfoMap().get(serviceKey);if(serviceObj==null){//根据serviceName进行注册中心服务器获取Service信息serviceObj=namingClientProxy.queryInstancesOfService(serviceName,groupName,clusters,0,false);serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime=serviceObj.getLastRefTime();return;}这部分逻辑已经覆盖经过上一篇文章的分析,这里重点介绍serviceInfoHolder#processServiceInfo中的业务逻辑处理。先看流程图,再看代码。上面的逻辑很简单:判断新的ServiceInfo数据是否正确,是否有变化。如果数据格式正确,发生变化,则发布InstancesChangeEvent事件,将ServiceInfo写入本地缓存。我们看一下代码实现:serviceInfo)){//emptyorerorpush,justignorereturnoldService;}//缓存服务信息serviceInfoMap.put(serviceInfo.getKey(),serviceInfo);//判断注册实例信息是否发生变化booleanchanged=isChangedServiceInfo(oldService,serviceInfo);if(StringUtils.isBlank(serviceInfo.getJsonFromServer()){serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}//通过prometheus-simpleclientMetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()));//服务实例改变了serviceInfo.getHosts()));//添加一个实例cechange事件,会推送给订阅者执行NotifyCenter.publishEvent(newInstancesChangeEvent(serviceInfo.getName(),serviceInfo.getGroupName(),serviceInfo.getClusters(),serviceInfo.getHosts()));//记录Service本地文件DiskCache.write(serviceInfo,cacheDir);}returnserviceInfo;}看流程图和注释部分就明白了代码中我们要重点关注的流程是服务信息变更后释放的InstancesChangeEvent,也就是流程图中用红色标注的部分。事件跟踪以上事件都是通过NotifyCenter发布的。NotifyCenter中的核心流程如下:事件在NotifyCenter中发布。发布的核心逻辑是:根据InstancesChangeEvent事件的类型,获取对应的CanonicalName;CanonicalName作为Key,从NotifyCenter#publisherMap获取对应的事件发布者(EventPublisher);EventPublisher发布InstancesChangeEvent事件。NotifyCenter中的核心代码实现如下:privatestaticbooleanpublishEvent(finalClasseventType,finalEventevent){if(ClassUtils.isAssignableFrom(SlowEvent.class,eventType)){returnINSTANCE.sharePublisher.publish(event);}//根据到InstancesChangeEvent事件类型,获取对应的CanonicalName;finalStringtopic=ClassUtils.getCanonicalName(eventType);//以CanonicalName为Key从NotifyCenter#publisherMap获取对应的事件发布者(EventPublisher);EventPublisherpublisher=INSTANCE.publisherMap.get(主题);if(publisher!=null){//EventPublisher发布InstancesChangeEvent事件。returnpublisher.publish(event);}LOGGER.warn("Thereareno[{}]publishersforthisevent,pleaseregister",topic);returnfalse;}上面代码中的INSTANCE是NotifyCenter的单例模式实现。那么,这里的publisherMap中key(CanonicalName)和value(EventPublisher)的关系是什么时候建立的呢?这是在实例化NacosNamingService时调用的init方法中绑定的://Publisher的注册过程就是建立InstancesChangeEvent.class和EventPublisher的关系。NotifyCenter.registerToPublisher(InstancesChangeEvent.class,16384);registerToPublisher方法默认使用DEFAULT_PUBLISHER_FACTORY来构建。publicstaticEventPublisherregisterToPublisher(finalClasseventType,finalintqueueMaxSize){returnregisterToPublisher(eventType,DEFAULT_PUBLISHER_FACTORY,queueMaxSize);}如果您检查NotifyCenter中的静态代码块,您会发现DEFAULT_PUBLISHER_FACTORY构建了默认的EventPublisher。至此,我们知道在NotifyCenter中,它维护着事件名称和事件发布者的关系,默认的事件发布者是DefaultPublisher。DefaultPublisher的事件发布查看DefaultPublisher的源码,会发现它继承自Thread,也就是说它是一个线程类。同时它实现了EventPublisher,也就是我们前面提到的发布者。publicclassDefaultPublisherextendsThreadimplementsEventPublisher{}在DefaultPublisher的init方法中实现如下:@Overridepublicvoidinit(Classtype,intbufferSize){//DaemonthreadsetDaemon(true);//设置线程名称setName("nacos.publisher-"+type.getName());this.eventType=type;this.queueMaxSize=bufferSize;//阻塞队列初始化this.queue=newArrayBlockingQueue<>(bufferSize);start();}也就是说在初始化DefaultPublisher的时候,它被守护以线程的形式运行,并初始化一个阻塞队列,队列默认大小为16384。最后调用start方法:@Overridepublicsynchronizedvoidstart(){if(!initialized){//startjustcalledoncesuper.start();if(queueMaxSize==-1){queueMaxSize=ringBufferSize;}initialized=true;}}start方法调用super.start,此时相当于启动线程,对应run方法将被执行。在run方法中,只调用了下面的方法:voidopenEventHandler(){try{//定义这个变量是为了解决队列中消息积压的问题。intwaitTimes=60;//无限循环不断的取出Event从队列中取出,通知Subscriber执行Event//为了保证消息不丢失,启用EventHandlerwhen//等待第一个Subscriber注册为(;;){if(shutdown||hasSubscriber()||waitTimes<=0){break;}ThreadUtils.sleep(1000L);waitTimes--;}for(;;){if(shutdown){break;}////检索EventfinalEventevent=queue.take();receiveEvent(event);UPDATER.compareAndSet(this,lastEventSequence,Math.max(lastEventSequence,event.sequence()));}}catch(Throwableex){记录器。error("Eventlistenerexception:",ex);}}这里写了两个死循环。第一个死循环可以理解为延时效果,也就是说线程启动时最大延时为60秒。在这60秒内,每隔1秒判断当前线程是否关闭,是否有订阅者,是否超过60秒。如果满足一个条件,可以提前跳出死循环。第二个死循环才是真正的业务逻辑处理,会从阻塞队列中取出一个事件,通过receiveEvent方法执行。那么,队列中的事件是从哪里来的呢?此时你可能会想到刚才调用了DefaultPublisher的release事件方法。我们看一下它的publish方法实现:@Overridepublicbooleanpublish(Eventevent){checkIsStart();booleansuccess=this.queue.offer(event);if(!success){LOGGER.warn("Unabletopluginduetointerruption,synchronizesendingtime,event:{}",event);receiveEvent(event);returntrue;}returntrue;}可以看到DefaultPublisher的publish方法确实是将事件存储到阻塞队列中。这里有一个分支逻辑。如果入金失败,则直接调用receiveEvent,与从队列中取出事件的方法相同。可以这样理解,如果存入队列失败,则立即执行,不会走队列。最后,我们看一下receiveEvent方法的实现:voidreceiveEvent(Eventevent){finallongcurrentEventSequence=event.sequence();if(!hasSubscriber()){LOGGER.warn([NotifyCenter]the{}丢失,因为没有订阅者。");return;}//通知订阅者执行Event//Notificationsingleeventlistenerfor(Subscribersubscriber:subscribers){//是否忽略expirationeventsif(subscriber.ignoreExpireEvent()&&lastEventSequence>currentEventSequence){LOGGER.debug([NotifyCenter]the{}isunacceptabletothissubscriber,getClass());continue;}//因为统一了smartSubscriber和subscriber,所以需要考虑兼容性。//去掉原来的判断部分codes.notifySubscriber(subscriber,event);}}这里的主要逻辑是遍历DefaultPublisher的订阅者(订阅者集合),然后执行通知订户的方法。那么有朋友想问下订阅者中的订阅者是从哪里来的呢?这也回到NacosNamingService的init方法://RegisterSubscribetoPublisherNotifyCenter.registerSubscriber(changeNotifier);该方法最终会调用NotifyCenter的addSubscriber方法:privatestaticvoidaddSubscriber(finalSubscriberconsumer,ClasssubscribeType,EventPublisherFactoryfactory){finalStringtopic=ClassUtils.getCanonicalName(subscribeType);synchronized(NotifyCenter.class){//MapUtils.computeIfAbsentisaunsafemethod.MapUtil.computeIfAbs(INSTANCE.publisherMap,topic,factory,subscribeType,ringBufferSize);}//获取对应的PublisherEventPublisherpublisher=INSTANCE.publisherMap.get(topic);if(publisherinstanceofShardedEventPublisher){((ShardedEventPublisher)publisher).addSubscriber(consumer,subscribeType);}else{//添加到订阅者集合publisher.addSubscriber(consumer);}}核心逻辑是绑定订阅事件、发布者和订阅者。发布者和事件通过Maps维护,发布者和订阅者通过关联关系维护。发布者找到了,事件也有了。最后看notifySubscriber方法:@OverridepublicvoidnotifySubscriber(finalSubscribersubscriber,finalEventevent){LOGGER.debug([NotifyCenter]the{}willreceivedby{}",event,subscriber);//执行订阅者EventfinalRunnablejob=()->subscriber.onEvent(事件);finalExecutorexecutor=subscriber.executor();if(executor!=null){executor.execute(job);}else{try{job.run();}catch(Throwablee){LOGGER.error("Eventcallbackexception:",e);}}}逻辑比较简单。如果订阅者定义了一个Executor,那么就使用它定义的Executor来执行事件。如果没有,创建线程执行。至此,整个服务订阅的事件机制就完成了。总结整体来说,整个服务订阅的事件机制还是比较复杂的,因为在使用事件形式的时候逻辑比较绕,这期间还混杂了守护线程、死循环、阻塞队列等时期。需要重点理解NotifyCenter维护事件发布者、事件订阅者和事件之间的关系,而维护这种关系的入口位于NacosNamingService的init方法中。我们梳理几个核心流程:InstancesChangeEvent事件通过ServiceInfoHolder中的NotifyCenter发布;事件在NotifyCenter发布,发布的核心逻辑是:根据InstancesChangeEvent事件的类型,获取对应的CanonicalName;CanonicalName作为Key,从NotifyCenter#publisherMap获取对应的事件发布者(EventPublisher);EventPublisher发布InstancesChangeEvent事件。InstancesChangeEvent事件的发布:通过EventPublisher的实现类DefaultPublisher发布InstancesChangeEvent事件;DefaultPublisher本身作为守护线程运行,在执行业务逻辑之前,首先判断线程是否启动;如果启动,则将事件添加到BlockingQueue,队列默认大小为16384;如果成功加入BlockingQueue,则整个发布过程完成;添加失败则直接调用DefaultPublisher#receiveEvent方法接收事件并通知订阅者;当通知订阅者时,会创建一个Runnable对象来执行订阅者的事件。Event事件是订阅执行时传入的事件;如果成功加入到BlockingQueue中,会遵循另外一个业务逻辑:DefaultPublisher在初始化时会创建一个阻塞(BlockingQueue)队列,并标记线程启动;DefaultPublisher本身是一个Thread,当执行super.start方法时,会调用它的run方法;run方法的核心业务逻辑是通过openEventHandler方法处理的;openEventHandler方法通过两次for循环从阻塞队列中获取时间信息;第一个for循环用于让线程启动时检查60s内的执行情况;第二个for循环是无限循环,从阻塞队列中获取Event,调用DefaultPublisher#receiveEvent方法接收事件并通知订阅者;Event事件是订阅执行时传入的Event;NacosClient服务定义了非常多的事件机制。在下一篇文章中,我们将讨论故障转移和缓存的实现。