当前位置: 首页 > 科技观察

Redis源码学习之事件驱动_0

时间:2023-03-12 21:02:39 科技观察

Redis基于多路复用技术实现了一套简单的事件驱动库。代码在ae.h、ae.c和ae_epoll.c、ae_evport.c和ae_kqueue.c、ae_select。c在这些文件中。其中ae表示antirezeventloop。Redis包含两种事件类型:FileEvent和TimeEvent。Redis使用了IO多路复用技术,所有的事件都在一个线程中处理。Redis的事件驱动模型可以用如下代码表示:intmain(intargc,char**argv){while(true){//等待事件到达:wait4Event();//处理事件:processEvent()}}在一个无限循环中等待事件的到来,然后处理事件,以此类推。这是最经典的网络编程模型之一。1.基本数据结构aeEventLoopaeEventLoop是Redis中事件驱动模型的核心,封装了整个事件循环,各字段解释如下:maxfd:已接受的最大文件描述符。setsize:当前循环可容纳的文件描述符个数。timeEventNextId:下一次事件的ID。lastTime:上次访问的时间,用于检测系统时钟是否被修改。events:指向保存所有注册事件的数组首地址的指针。fired:指针,保存所有触发事件数组的首地址。timeEventHead:Redis使用一个链表来存储所有的时间事件,timeEventHead是一个指向链表首节点的指针。stop:停止整个事件循环。apiData:指针,指向epoll结构。beforeSleep:函数指针。每次执行循环时,都会在阻塞之前调用这个函数,直到时间到了。aeFileEvent和aeTimeEvent这两个结构体分别表示文件事件和时间事件,定义如下://函数指针,读取事件处理void*clientData;//具体数据}aeFileEvent;其中mask表示文件事件类型掩码,可以是AE_READABLE表示可读事件,AE_WRITABLE表示可写事件。aeFileProc是一个函数指针。/*Timeeventstructure*/typedefstructaeTimeEvent{longlongid;//事件IDlongwhen_sec;//事件触发时间:slongwhen_ms;//事件触发时间:msaeTimeProc*timeProc;//函数指针aeEventFinalizerProc*finalizerProc;//函数指针:在对应的Called之前aeTieEvent节点被删除,可以理解为aeTimeEvent的析构函数void*clientData;//指向具体数据的指针structaeTimeEvent*next;//指向下一个时间事件的指针}aeTimeEvent;aeFiredEventeFiredEvent结构表示一个触发事件Event,结果如下:/*Afiredevent*/typedefstructaeFiredEvent{intfd;//触发事件的文件描述符intmask;//触发事件的掩码,表示触发事件的类型触发事件}aeFiredEvent;fd表示事件发生在哪个文件描述符上面,mask用来表示具体事件的类型。aeApiStateRedis底层使用IO多路复用技术实现高并发,具体实现可以使用kqueue、select、epoll等技术。对于Linux,epoll的性能要优于select,所以以epoll为例进行分析。typedefstructaeApiState{intepfd;structepoll_event*events;}aeApiState;aeApiState封装了epoll相关的数据,epfd保存了epoll_create()返回的文件描述符。具体实现细节事件循环启动:aeMain()事件驱动的启动代码位于ae.c的aeMain()函数中,代码如下:从aeMain()方法可以看出,整个事件驱动在一个while()循环中不断的执行aeProcessEvents()方法,在这个方法中执行客户端发送过来的请求。初始化:aeCreateEventLoop()aeEventLoop的初始化是在aeCreateEventLoop()方法中进行的,在server.c中的initServer()中调用。实现如下:在该方法中,主要是为aeEventLoop对象分配内存,然后对其进行初始化。关键点是:1.调用aeApiCreate()初始化epoll相关数据。aeApiCreate()的实现如下:在aeApiCreate()方法中,主要做了以下三件事:分配aeApiState结构体需要的内存。调用epoll_create()方法生成一个epoll文件描述符,保存在aeApiState.epfd字段中。在EventLoop->apidata字段中保存第一步分配的aeApiState的内存地址。2.将事件中的掩码字段初始化为AE_NONE。生成fileEvent:aeCreateFileEvent()Redis使用aeCreateFileEvent()来生成fileEvent,代码如下:aeCreateFileEvent()方法主要做了以下三件事:检查新添加的fd是否超过了可以容纳的最大容量。调用aeApiAddEvent()方法将相应的fd添加到mask模式的epoll监听器中。设置相应的字段值。最关键的是第二步。aeApiAddEvent()方法如下:生成timeEvent:aeCreateTimeEvent()aeCreateTimeEvent()方法主要用于生成timeEvent节点,实现比较简单。代码如下:processtimeEevnt:processTimeEvents()Redis在processTimeEvents()方法中处理所有的timeEvents,如下:staticintprocessTimeEvents(aeEventLoop*eventLoop){intprocessed=0;aeTimeEvent*te,*prev;longlongmaxId;time_tnow=time(NULL);/***如果系统时间被调整为以后的某个时间,然后重新设置回正确的时间。*在这种情况下,链表中的timeEvent可能会随机延迟,所以在这种情况下,将所有timeEvents的触发时间设置为0表示执行*/if(nowlastTime){te=eventLoop->timeEventHead;while(te){te->when_sec=0;te=te->next;}}eventLoop->lastTime=now;//设置最后运行时间为nowprev=NULL;te=eventLoop->timeEventHead;maxId=eventLoop->timeEventNextId-1;while(te){longnow_sec,now_ms;longlongid;/***delete已经被标志时间事件删除*/if(te->id==AE_DELETED_EVENT_ID){aeTimeEvent*next=te->next;if(prev==NULL)eventLoop->timeEventHead=te->next;elseprev->next=te->next;if(te->finalizerProc)//在时间事件之前调用finlizerProce()方法节点被删除te->finalizerProc(eventLoop,te->clientData);zfree(te);te=next;continue;}if(te->id>maxId){/***te->id>maxId表示当前te指向的timeEvent是在当前新加入的loop,*新加入的节点不在这个循环中处理*PS:为什么会这样?有没有可能会在timeProc()中注册一个新的timeEvent节点?*对于目前的Redis版本,不会出现te->id>maxId的情况*/te=te->next;continue;}aeGetTime(&now_sec,&now_ms);if(now_sec>te->when_sec||(now_sec==te->when_sec&&now_ms>=te->when_ms)){//如果当前时间已经超过对应timeEvent节点设置的触发时间,//则调用timeProc()方法执行对应任务intretval;id=te->id;retval=te->timeProc(eventLoop,id,te->clientData);processed++;if(retval!=AE_NOMORE){//要执行更多次,则计算下一次执行时间aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);}else{//如果只需要执行一次,将id设置为-1,在下一个循环中删除te->id=AE_DELETED_EVENT_ID;}}prev=te;te=te->next;}returnprocessed;}这个方法中会判断系统时间是否调整过。触发时间设置为0,即立即执行。遍历timeEvent链表,对于每个timeEvent节点,如果有:返回AE_NOMORE,说明当前timeEvent节点属于一次性事件,标记节点ID为AE_DELETED_EVENT_ID,说明该节点被删除,该节点将在下一个循环delete中被删除。如果返回不是AE_NOMORE,说明当前timeEvent节点是一个周期事件,需要执行多次。调用aeAddMillisecondsToNow()方法设置下一次执行时间。如果已经被标记为删除(AE_DELETED_EVENT_ID),则立即释放对应节点的内存,遍历下一个节点。如果id大于maxId,说明当前节点是本次循环中新加入的节点,所以本次循环可以处理好,继续下一个节点。如果当前节点的触发时间大于当前时间,则调用对应节点的timeProc()方法执行任务。根据timeProc()方法的返回,可以分为两种情况:处理所有事件:aeProcessEvents()Redis中的所有事件,包括timeEvent和fileEvent,都在aeProcessEvents()方法中处理。Theimplementationofthejustmethodisasfollows:/*Processeverypendingtimeevent,theneverypendingfileevent*(thatmayberegisteredbytimeeventcallbacksjustprocessed).*Withoutspecialflagsthefunctionsleepsuntilsomefileevent*fires,orwhenthenexttimeeventoccurs(ifany).**Ifflagsis0,thefunctiondoesnothingandreturns.*ifflagshasAE_ALL_EVENTSset,allthekindofeventsareprocessed.*ifflagshasAE_FILE_EVENTSset,fileeventsareprocessed.*ifflagshasAE_TIME_EVENTSset,timeeventsareprocessed.*ifflagshasAE_DONT_WAITsetthefunctionreturnsASAPuntilall*theeventstthat'spossibletoprocesswithouttowaitareprocessed.**Thefunctionreturnsthenumberofeventsprocessed.*/intaeProcessEvents(aeEventLoop*eventLoop,intflags){intprocessed=0,numevents;/***如果没有时间事件或文件事件,直接返回*/if(!(标志&AE_TIME_EVENTS)&&!(标志&AE_FILE_EVENTS))return0;/***-1==eventloop->maxfd表示任何aeFileEvent已经加入epoll*在事件循环中监听*/if(eventLoop->maxfd!=-1||((flags&AE_TIME_EVENTS)&&!(flags&AE_DONT_WAIT))){intj;aeTimeEvent*shortest=NULL;structtimevaltv,*tvp;/***如果有如果aeFileEvent需要处理,首先要从所有未决的*aeTimeEvent事件*中找到最近的aeTimeEvent节点执行,并结算节点触发时间*/if(flags&AE_TIME_EVENTS&&!(flags&AE_DONT_WAIT))shortest=aeSearchNearestTimer(eventLoop);if(shortest){longnow_sec,now_ms;aeGetTime(&now_sec,&now_ms);tvp=&tv;/*Howmanymillisecondsweneedtowaitforthenext*timeeventtofire?*///计算epoll_wait()的等待时间longlongms=(shortest->when_sec-now_sec)*1000+最短->when_ms-now_ms;if(ms>0){tvp->tv_sec=ms/1000;tvp->tv_usec=(ms%1000)*1000;}else{tvp->tv_sec=0;tvp->tv_usec=0;}}else{//如果flags设置了AE_DONT_WAIT,则设置epoll_wait()等待时间为0,//即立即从epoll返回if(flags&AE_DONT_WAIT){tv.tv_sec=tv.tv_usec=0;tvp=&tv;}else{/*Otherwisewecanblock*/tvp=NULL;/*waitforever*/}}//调用aeApiPoll()阻塞等待事件的到来,等待时间为tvpnumevents=aeApiPoll(eventLoop,tvp);for(j=0;j<;numevents;j++){aeFileEvent*fe=&eventLoop->events[eventLoop->fired[j].fd];intmask=eventLoop->fired[j].mask;intfd=eventLoop->fired[j].fd;intrfired=0;/*notethefe->mask&mask&...code:maybeanalreadyprocessed*eventremovedanelementthatfiredandwestilldidn't*processed,sowecheckiftheeventistillvalid.*///fe->mask&&mask是保证对应的事件仍然有效if(fe->mask&mask&AE_READABLE){rfired=1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if(fe->mask&mask&AE_WRITABLE){if(!rfired||fe->wfileProc!=fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/*Checktimeevents*/if(flags&AE_TIME_EVENTS)//处理aeTimeEventprocessed+=processTimeEvents(eventLoop);returnprocessed;/*returnthenumberofprocessedfile/timeevents*/}该方法入参flag表示处理哪些事件,可以取以下值:AE_ALL_EVENTS:timeEvent和fileEvent都会处理AE_FILE_EVENTS:只处理fileEvent。AE_TIME_EVENTS:只处理timeEvent。AE_DONT_WAIT:要么立即返回,要么处理完那些不需要等待的事件后立即返回。aeProcessEvents()方法会做如下事情:判断传入的flag的值,如果既不包含AE_TIME_EVENTS也不包含AE_FILE_EVENTS则直接返回。计算是否有aeFileEvent事件需要处理,先计算epoll_wait()方法需要阻塞等待的时间。计算方法如下:先从aeTimeEvent事件列表中找到距离最近的需要触发的aeTimeEvent节点,计算需要触发的时间。触发的时间就是epoll_wait()的等待时间。如果没有找到最近的aeTimeEvent节点,说明没有aeTimeEvent节点加入链表,然后判断传入的flags是否包含AE_DONT_WAIT选项,然后设置epoll_wait()的等待时间为0,即立即返回.如果没有设置AE_DONT_WAIT,则将需要的等待时间设置为NULL,表示epoll_wait()已经被阻塞等待知道一个fileEvent事件已经到来。调用aeApiPoll()方法阻塞等待事件的到来,阻塞时间为第二步计算的时间。aeApiPoll()的实现见文末:aeApiPoll()会做以下事情:根据传入的tvp计算出阻塞的时间,然后调用epoll_wait()等待阻塞。事件到达后,首先计算对应事件的类型。将事件发生的fd和对应的类型掩码复制到fired数组中。从aeApiPoll()方法返回后,eventLoop->fired[]数组中已经保存了所有准备好事件的fd和对应事件的类型掩码。依次遍历fired数组,根据掩码类型执行相应的frileProc()或wfileProce()方法。如果传入的flags中有AE_TIME_EVENTS,则调用processTimeEvents()执行所有已经过期的timeEvents。Redis事务本系列Redis源码学习