当前位置: 首页 > 后端技术 > PHP

Swoole源码分析——Linux中创建Reactor模块的ReactorEpoll

时间:2023-03-29 23:09:19 PHP

epoll对象,最高效的reactor机制是epoll。swReactor的对象会存放epoll的对象swReactorEpoll_s。在这个数据结构中,epfd是epoll的id,events在epoll_wait函数中用于接收就绪事件。该任数最重要的是epoll_create,该任数会创建epoll对象typedefstructswReactorEpoll_sswReactorEpoll;structswReactorEpoll_s{intepfd;structepoll_event*events;};intswReactorEpoll_create(swReactor*reactor,intmax_event_num){//创建反应器对象swReactorEpoll*reactor_object=sw_malloc(sizeof(swReactorEpoll));if(reactor_object==NULL){swWarn("malloc[0]失败。");返回SW_ERR;}bzero(reactor_object,sizeof(swReactorEpoll));reactor->object=reactor_object;reactor->max_event_num=max_event_num;reactor_object->events=sw_calloc(max_event_num,sizeof(structepoll_event));if(reactor_object->events==NULL){swWarn("malloc[1]失败。");sw_free(reactor_object);返回SW_ERR;}//epoll创建reactor_object->epfd=epoll_create(512);if(reactor_object->epfd<0){swWarn("epoll_createfailed.Error:%s[%d]",strerror(errno),errno);sw_free(reactor_object);返回SW_ERR;}//绑定方法reactor->add=swReactorEpoll_add;reactor->set=swReactorEpoll_set;reactor->del=swReactorEpoll_del;reactor->wait=swReactorEpoll_wait;;returnSW_OK;}epoll增加monitorswReactorEpoll_event_set函数,将可读(SW_EVENT_READ)和可写(SW_EVENT_WRITE)状态转换为EPOLLIN、EPOLLOUT、EPOLLERR可供epoll函数使用sw_inlineintswReactorEpoll_event_set(intfdtype){uint32_tflag=0;(swReactor_event_read(fdtype)){flag|=EPOLLIN;}if(swReactor_event_write(fdtype)){flag|=EPOLLOUT;}if(swReactor_event_error(fdtype)){//标志|=(EPOLRDHUP);标志|=(EPOLLRDHUP|EPOLLHUP|EPOLLERR);}returnflag;}swReactorEpoll_add函数用于为reactor添加一个新的文件描述符来监控和添加fd。最重要的是使用epoll_ctl函数的EPOLL_CTL_ADD命令为了在调用epoll_wait后更容易获取到fd的类型,不是仅仅在epoll_ctl函数中添加fd,而是添加swFd类型,swFd类型包含文件描述符和文件类型。swReactor_add函数用于更新reactor->socket_list的fdtype与events最后需要自增event_num的数值typedefstruct_swFd{uint32_tfd;uint32_tfdtype;}swFd;staticintswReactorEpoll_add(swReactor*reactor,intfd,intfdtype){swReactorEpoll*object=reactor->object;}结构epoll_evente;swFdfd_;bzero(&e,sizeof(structepoll_event));fd_.fd=fd;fd_.fdtype=swReactor_fdtype(fdtype);e.events=swReactorEpoll_event_set(fdtype);swReactor_add(反应堆,fd,fdtype);memcpy(&(e.data.u64),&fd_,sizeof(fd_));if(epoll_ctl(object->epfd,EPOLL_CTL_ADD,fd,&e)<0){swSysError("添加事件[fd=%d#%d,type=%d,events=%d]失败。",fd,reactor->id,fd_.fdtype,e.events);swReactor_del(反应器,fd);返回SW_ERR;}swTraceLog(SW_TRACE_EVENT,"添加事件[reactor_id=%d,fd=%d,events=%d]",reactor->id,fd,swReactor_events(fdtype));反应器->event_num++;返回SW_OK;}staticsw_inlinevoidswReactor_add(swReactor*reactor,intfd,inttype){swConnection*socket=swReactor_get(reactor,fd);socket->fdtype=swReactor_fdtype(类型);套接字->事件=swReactor_events(类型);socket->removed=0;}Epoll修改监听修改监听主要调用epoll_ctl的EPOLL_CTL_MOD命令staticintswReactorEpoll_set(swReactor*reactor,intfd,intfdtype){swReactorEpoll*object=reactor->object;swFdfd_;结构epoll_evente;诠释;bzero(&e,sizeof(structepoll_event));e.events=swReactorEpoll_event_set(fdtype);如果(e.events&EPOLLOUT){断言(fd>2);}fd_.fd=fd;fd_.fdtype=swReactor_fdtype(fdtype);memcpy(&(e.data.u64),&fd_,sizeof(fd_));ret=epoll_ctl(object->epfd,EPOLL_CTL_MOD,fd,&e);if(ret<0){swSysError("reactor#%d->set(fd=%d|type=%d|events=%d)失败。",reactor->id,fd,fd_.fdtype,e.事件);返回SW_ERR;}swTraceLog(SW_TRACE_EVENT,"setevent[reactor_id=%d,fd=%d,events=%d]",reactor->id,fd,swReactor_events(fdtype));//执行父方法swReactor_set(reactor,fd,fdtype);returnSW_OK;}epolldeletelistenermodifylistener主要是调用epoll_ctlEPOLL_CTL_DEL命令最后需要更新event_numstaticintswReactorEpoll_del(swReactor*reactor,intfd){swReactorEpoll*object=reactor->object;if(epoll_ctl(object->epfd,EPOLL_CTL_DEL,fd,NULL)<0){swSysError("epollremovefd[%d#%d]failed.",fd,reactor->id);返回SW_ERR;}swTraceLog(SW_TRACE_REACTOR,"移除事件[reactor_id=%d|fd=%d]",reactor->id,fd);reactor->event_num=reactor->event_num<=0?0:反应器->event_num-1;swReactor_del(反应器,fd);returnSW_OK;}epoll监听等待就绪swReactorEpoll_wait是reactor的核心。这个函数最重要的是调用了epoll_wait。首先需要通过timeo参数设置msec,使用object->events设置eventspoll_wait。函数返回后,如果n<0,需要先检查erron。如果EINTR,则表示有信号触发。这时候需要执行信号的回调函数,然后继续事件循环如果不是EINTR,则返回错误并结束事件循环。如果n==0,一般是因为epoll_wait超时了。这时需要调用超时回调函数。如果n>0,则必须从events中获取就绪的swFd对象,并使用该对象的值来初始化事件。接下来,我们需要检查events[i]的值。断开连接,这是linux从2.6.17开始的新特性。使用swReactor_getHandle函数取出对应的文件描述符类型事件回调函数。事件循环中最后一次调用onFinish函数。如果设置了once,表示reactor只循环一次,立即退出;否则继续事件循环typedefstruct_swEvent{intfd;int16_t来自_id;uint8_t类型;swConnection*socket;}swEvent;staticintswReactorEpoll_wait(swReactor*reactor,structtimeval*timeo){swEvent事件;swReactorEpoll*object=reactor->object;swReactor_handle句柄;inti,n,ret,毫秒;intreactor_id=reactor->id;intepoll_fd=object->epfd;intmax_event_num=reactor->max_event_num;structepoll_event*events=object->events;if(reactor->timeout_msec==0){if(timeo==NULL){reactor->timeout_msec=-1;}别的{reactor->timeout_msec=timeo->tv_sec*1000+timeo->tv_usec/1000;}}反应堆->开始=1;while(reactor->running>0){if(reactor->onBegin!=NULL){reactor->onBegin(reactor);}msec=reactor->timeout_msec;n=epoll_wait(epoll_fd,events,max_event_num,msec);if(n<0){if(swReactor_error(reactor)<0){swWarn("[Reactor#%d]epoll_wait失败。错误:%s[%d]",reactor_id,strerror(errno),errno);返回SW_ERR;}else{继续;}}elseif(n==0){if(reactor->onTimeout!=NULL){reactor->onTimeout(reactor);}继续;}for(i=0;i>32;event.socket=swReactor_get(reactor,event.fd);//读取if((events[i].events&EPOLLIN)&&!event.socket->removed){handle=swReactor_getHandle(reactor,SW_EVENT_READ,event.type);ret=handle(reactor,&event);if(ret<0){swSysError("EPOLLIN句柄失败。fd=%d。",event.fd);}}//writeif((events[i].events&EPOLLOUT)&&!event.socket->removed){handle=swReactor_getHandle(reactor,SW_EVENT_WRITE,event.type);ret=handle(reactor,&event);if(ret<0){swSysError("EPOLLOUT句柄失败。fd=%d。",event.fd);}}//error#ifndefNO_EPOLLRDHUPif((events[i].events&(EPOLLRDHUP|EPOLLERR|EPOLLHUP))&&!event.socket->removed)#elseif((events[i].events&(EPOLLERR|EPOLLHUP)))&&!event.socket->removed)#endif{//忽略ERR和HUP,因为事件已经在IN和OUT处理程序中处理。if((events[i].events&EPOLLIN)||(events[i].events&EPOLLOUT)){继续;}handle=swReactor_getHandle(reactor,SW_EVENT_ERROR,event.type);ret=handle(reactor,&event);if(ret<0){swSysError("EPOLLERR句柄失败。fd=%d。",event.fd);}}}if(reactor->onFinish!=NULL){reactor->onFinish(reactor);}如果(反应器->一次){休息;}}return0;}staticsw_inlineintswReactor_error(swReactor*reactor){switch(errno){caseEINTR:if(reactor->singal_no){swSignal_callback(reactor->singal_no);反应堆->singal_no=0;}返回SW_OK;}returnSW_ERR;}staticsw_inlineswReactor_handleswReactor_getHandle(swReactor*reactor,intevent_type,intfdtype){if(event_type==SW_EVENT_WRITE){return(reactor->write_handle[fdtype]!=NULL)?reactor->write_handle[fdtype]:reactor->handle[SW_FD_WRITE];}elseif(event_type==SW_EVENT_ERROR){return(reactor->error_handle[fdtype]!=NULL)?reactor->error_handle[fdtype]:reactor->handle[SW_FD_CLOSE];}returnreactor->handle[fdtype];}