最近在看UNIX网络编程,研究Redis的实现。感觉Redis的源码很适合阅读和分析。I/O多路复用(multiplexing)部分的实现非常干净优雅。这里我想简单的梳理一下这部分的内容。几种I/O模型Redis为什么要使用I/O多路复用?首先,Redis是单线程运行的,所有的操作都是线性顺序执行的。但是,由于读写操作被阻塞等待用户输入或输出,所以I/O操作在正常情况下往往不能直接返回,会造成某个文件的I/O阻塞,导致整个流程无法为其他客户提供服务,I/O多路复用的出现就是为了解决这个问题。BlockingI/O先来看看传统的阻塞I/O模型是如何工作的:当使用read或write读写一个文件描述符(FileDescriptor以下简称FD)**时,如果当前FD不是可读或者可写,整个Redis服务不会响应其他操作,导致整个服务不可用。这是传统意义上的,也就是我们在编程中使用最多的阻塞模型:blocking-io阻塞模型在开发中很常见,也很容易理解,但是因为会影响其他FD对应的服务,所以需要处理with执行多个客户端任务时,通常不会使用阻塞模型。I/O多路复用虽然还有很多其他的I/O模型,这里不再详细介绍。阻塞I/O模型不能满足这里的需求。我们需要一个更高效的I/O模型来支持Redis的多客户端(redis-cli),这就涉及到I/O多路复用模型:I:O-Multiplexing-ModeI/O多路复用模型中,最重要的函数调用是select,可以同时监控多个文件描述符的可读性和可写性。当某些文件描述符可读或可写时,select方法返回可读和可写文件描述符的个数。关于select的具体使用,网上资料很多,这里就不过多介绍了;同时还有其他的I/O多路复用函数epoll/kqueue/evport,性能比select更好,同时也可以支持更多的服务。Reactor设计模式Redis服务采用Reactor实现文件事件处理器(每个网络连接实际上对应一个文件描述符)redis-reactor-pattern文件事件处理器使用I/O多路复用模块监听多个FD,当accept、read、write和关闭文件事件产生后,文件事件处理器会回调fd绑定的事件处理器。虽然整个文件事件处理器运行在单线程上,但是通过引入I/O多路复用模块,实现了对多个FD同时读写的监控,提高了网络通信模型的性能,并且可以也保证了整个Redis服务实现的简单性。I/O多路复用模块I/O多路复用模块封装了底层的select、epoll、avport、kqueue等I/O多路复用功能,并向上层提供相同的接口。ae-module这里简单介绍一下Redis是如何封装select和epoll的,简单了解下这个模块的功能。整个I/O复用模块抹平了不同平台I/O复用函数的差异,提供了相同的接口:staticintaeApiCreate(aeEventLoop*eventLoop)staticintaeApiResize(aeEventLoop*eventLoop,intsetsize)staticvoidaeApiFree(aeEventLoop)*eventLoop)staticintaeApiAddEvent(aeEventLoop*eventLoop,intfd,intmaskid)aeApiDelEvent(aeEventLoop*eventLoop,intfd,intmask)staticintaeApiPoll(aeEventLoop*eventLoop,structtimeval*tvp)同时,由于参数每个函数需要的参数不同,我们使用一个aeApiState来存储每个子模块需要的参数Context信息://selecttypedefstructaeApiState{fd_setrfds,wfds;fd_set_rfds,_wfds;}aeApiState;//epolltypedefstructaeApiState{intepfd;structepoll_event*events;}aeAPI状态;这些上下文信息会保存在eventLoop的void*state中,不会暴露给上层。仅在当前子模块中使用。封装select函数select,用于监控FD的可读、可写和错误状态。在介绍I/O多路复用模块如何封装select函数之前,先看一下select函数的大致流程:intfd=/*filedescriptor*/fd_setrfds;FD_ZERO(&rfds);FD_SET(fd,&rfds)for(;;){select(fd+1,&rfds,NULL,NULL,NULL);if(FD_ISSET(fd,&rfds)){/*filedescriptor`fd`becomesreadable*/}}初始化一个可读的fd_set集合,保存需要的Monitor可读的FD;使用FD_SET将fd添加到rfds;调用select方法监控rfds中的FD是否可读;当select返回时,查看fd的状态,完成相应的操作。Redis的ae_select文件中的代码组织顺序类似。首先在aeApiCreate函数中初始化rfds和wfds:staticintaeApiCreate(aeEventLoop*eventLoop){aeApiState*state=zmalloc(sizeof(aeApiState));if(!state)return-1;FD_ZERO(&state->rfds);FD_ZERO(&state->wfds);eventLoop->apidata=state;return0;}而aeApiAddEvent和aeApiDelEvent会通过FD_SET和FD_CLR修改fd_set中对应的FD标志:staticintaeApiAddEvent(aeEventLoop*eventLoop,intfd,intmask){aeApiState*state=eventLoop->apidata;if(mask&AE_READABLE)FD_SET(fd,&state->rfds);if(mask&AE_WRITABLE)FD_SET(fd,&state->wfds);return0;}整个ae_select子模块中最重要的函数是aeApiPoll,它是实际调用select函数的部分。它的作用是在I/O多路复用函数返回时,将对应的FD添加到aeEventLoop的fired数组中,返回事件编号Number:intretval,j,numevents=0;memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));retval=select(eventLoop->maxfd+1,&state->_rfds,&state->_wfds,NULL,tvp);if(retval>0){for(j=0;j<=eventLoop->maxfd;j++){intmask=0;aeFileEvent*fe=&eventLoop->events[j];if(fe->mask==AE_NONE)continue;if(fe->mask&AE_READABLE&&FD_ISSET(j,&state->_rfds))mask|=AE_READABLE;if(fe->mask&AE_WRITABLE&&FD_ISSET(j,&state->_wfds))mask|=AE_WRITABLE;eventLoop->fired[numevents].fd=j;eventLoop->fired[numevents].mask=mask;numevents++;}}returnnumevents;}封装epoll函数Redis对epoll的封装其实际上也是类似的,使用epoll_create创建epoll中使用的epfd:staticintaeApiCreate(aeEventLoop*eventLoop){aeApiState*state=zmalloc(sizeof(aeApiState));if(!state)return-1;state->events=zmalloc(sizeof(structepoll_event)*eventLoop->setsize);if(!state->events){zfree(state);return-1;}state->epfd=epoll_create(1024);/*1024isjustahintforthekernel*/if(state->epfd==-1){zfree(state->events);zfree(state);return-1;}eventLoop->apidata=state;return0;}在aeApiAddEvent中使用epoll_ctl将要监听的FD和监听的事件添加到epfd中:staticintaeApiAddEvent(aeEventLoop*eventLoop,intfd,intmask){aeApiState*state=eventLoop->apidata;structepoll_eventee={0};/*avoidvalgrindwarning*//*Ifthefdwasalreadymonitoredforsomeevent,weneedaMOD*operation.OtherwiseweneedanADDoperation.*/intop=eventLoop->events[fd].mask==AE_NONE?EPOLL_CTL_ADD:EPOLL_CTL_MOD;ee.events=0;mask|=eventLoopfd->events[].mask;/*Mergeoldevents*/if(mask&AE_READABLE)ee.events|=EPOLLIN;if(mask&AE_WRITABLE)ee.events|=EPOLLOUT;ee.data.fd=fd;if(epoll_ctl(state->epfd,op,fd,&ee)==-1)return-1;return0;}由于epoll与select的机制略有不同,所以epoll_wait函数返回时不需要遍历所有的fd来查看读写状态;当epoll_wait函数返回时,它会提供一个epoll_event数组:EPOLLOUT、EPOLLERR和EPOLLHUP)和事件发生的FDaeApiPoll函数只需要将epoll_event数组存储的信息添加到eventLoop的fired数组中,并将信息传递给上层模块:intretval,numevents=0;retval=epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp?(tvp->tv_sec*1000+tvp->tv_usec/1000):-1);if(retval>0){intj;numevents=retval;for(j=0;j
