本文转载自微信公众号《编程杂技》,作者theanarkh。转载本文请联系编程杂技公众号。前言:假设读者已经了解协程的概念和实现协程的底层技术支持。本文将介绍如何在底层基础上实现协程以及协程的应用(更多基础知识点此[1])。libtask是由google老板RussCox(Go的核心开发者)编写的。本文介绍libtask的基本原理。我们从libtask的main函数开始。这个main函数就是我们在c语言中使用的c函数。libtask本身实现了main函数。用户在使用libtask时,需要实现taskmain函数。taskmain和main的函数声明是一样的。让我们看一下主要功能。intmain(intargc,char**argv){structsigactionsa,osa;//注册SIGQUIT信号处理函数memset(&sa,0,sizeofsa);sa.sa_handler=taskinfo;sa.sa_flags=SA_RESTART;sigaction(SIGQUIT,&sa,&osa);//保存命令行参数argv0=argv[0];taskargc=argc;taskargv=argv;if(mainstacksize==0)mainstacksize=256*1024;//创建第一个协程taskcreate(taskmainstart,nil,mainstacksize);//开始调度taskscheduler();fprint(2,"taskschedulerreturnedinmain!\n");abort();return0;}main函数主要的两个逻辑是taskcreate和taskscheduler函数。我们先看taskcreate。inttaskcreate(void(*fn)(void*),void*arg,uintstack){intid;Task*t;t=taskalloc(fn,arg,stack);taskcount++;id=t->id;//记录位置t->alltaskslot=nalltask;//保存到alltaskalltask[nalltask++]=t;//修改状态为就绪,可以被调度,加入就绪队列taskready(t);returnid;}taskcreate首先调用taskalloc来分配一个表示协程的结构Task。让我们看看这个结构体的定义。structTask{charname[256];//offsetknowntoacidcharstate[256];//前后指针Task*next;Task*prev;Task*allnext;Task*allprev;//执行上下文Contextcontext;//睡眠时间uvlongalarmtime;uintid;//堆栈信息uchar*stk;uintstksize;//是否退出intexiting;//alltask中的intalltaskslot索引;//是否为系统协程intsystem;//是否就绪;//入口函数void(*startfn)(void*);//入口参数void*startarg;//自定义数据void*udata;};然后看taskalloc的实现。//分配一个协程需要的内存,并初始化一些字段///结构本身的大小+栈大小t=malloc(sizeof*t+stack);memset(t,0,sizeof*t);//栈的内存位置t->stk=(uchar*)(t+1);//栈大小t->stksize=stack;//协程idt->id=++taskidgen;//协程工作函数和参数t->startfn=fn;t->startarg=arg;/*doareasonableinitialization*/memset(&t->context.uc,0,sizeoft->context.uc);sigemptyset(&zero);//初始化uc_sigmask字段为空,即不阻塞信号sigprocmask(SIG_BLOCK,&zero,&t->context.uc.uc_sigmask);/*mustinitializewithcurrentcontext*///初始化uc域getcontext(&t->context.uc)//设置协程执行时的栈位置和大小t->context.uc.uc_stack.ss_sp=t->stk+8;t->context.uc.uc_stack.ss_size=t->stksize-64;z=(ulong)t;y=z;z>>=16;/*隐藏undefined32-bitshiftfrom32-bitcompilers*/x=z>>16;//保存信息到uc域makecontext(&t->context.uc,(void(*)())taskstart,2,y,x);return;}taskalloc函数代码看起来很多,但是逻辑并不复杂。就是申请Task结构体需要的内存和执行栈的内存,然后初始化这样,协程就诞生了。然后执行taskready将协程加入就绪队列。//修改协程的状态为就绪并加入就绪队列voidtaskready(Task*t){t->ready=1;addtask(&taskrunqueue,t);}//将协程插入队列在另一个队列之前,将被移除voidaddtask(Tasklist*l,Task*t){if(l->tail){l->tail->next=t;t->prev=l->tail;}else{l->head=t;t->prev=nil;}l->tail=t;t->next=nil;}taskrunqueue记录了所有就绪的协程。协程创建并加入队列后,协程还没有开始执行。就像操作系统的进程和线程一样,需要一个调度器来调度执行。让我们看一下调度器的实现。//协程调度中心staticvoidtaskscheduler(void){inti;Task*t;for(;;){//没有用户协程,则退出if(taskcount==0)exit(taskexitval);//从就绪queue取出一个协程t=taskrunqueue.head;if(t==nil){fprint(2,"norunnabletasks!%dtasksstalled\n",taskcount);exit(1);}//从就绪队列中删除协程deltask(&taskrunqueue,t);t->ready=0;//保存当前正在执行的协程taskrunning=t;//切换次数加一tasknswitch++;//切换到t执行,将当前上下文保存到taskschedcontext中(即下面要执行的代码)contextswitch(&taskschedcontext,&t->context);//执行到这里表示没有协程在执行(tswitchedback),emptytaskrunning=nil;//刚刚执行的协程t退出if(t->exiting){//如果不是系统协程,则数量减一if(!t->system)taskcount--;//当前协程在alltask中的索引i=t->alltaskslot;//把最后一个协程改到当前协程的位置,因为他即将退出alltask[i]=alltask[--nalltask];//更新被替换协程的索引alltask[i]->alltaskslot=i;//释放堆内存free(t);}}}调度器的代码看起来很多,但是核心逻辑只是三个1Take从就绪队列中取出一个协程t,将t移出就绪队列2通过contextswitch切换到协程,在t中执行3协程t切换回调度中心。如果t已经退出,修改数据结构,回收其占用的内存。如果t不退出,继续调度其他协程执行。此时,协程开始运行。还有一个调度系统。这里的调度机制比较简单,就是按照先进先出的方式进行就绪调度,是非抢占式的。也就是说,没有时间片调度的概念。协程的执行时间由自己决定,放弃执行的权力也由自己控制。当协程不想执行时,可以调用taskyield让出CPU。//协程主动放弃cpuinttaskyield(void){intn;//当前切换协程的个数n=tasknswitch;//插入就绪队列等待后续调度taskready(taskrunning);taskstate("yield");//切换协程taskswitch();//等于0表示当前只有一个协程,调度时tasknswitch加一,所以这里减一returntasknswitch-n-1;}/*切换协程,taskrunning是正在执行的协程,taskschedcontext是调度协程(主线程)的上下文,切换到调度中心,保持当前上下文为taskrunning->context*/voidtaskswitch(void){needstack(0);contextswitch(&taskrunning->context,&taskschedcontext);}//实际切换协程的逻辑staticvoidcontextswitch(Context*from,Context*to){if(swapcontext(&from->uc,&to->uc)<0){fprint(2,"swapcontextfailed:%r\n");assert(0);}}yield的逻辑也很简单,因为coroutine不在readyque中ue在执行时,当协程准备让出cpu时,协程先将自己重新加入就绪队列,等待下一次调度执行。当然我们也可以直接调度contextswitch切换到其他协程。重点是什么时候应该释放cpu,什么时候应该安排执行。接下来会详细解释。至此,我们已经具备了支持协程所需的底层基础。我们看到这个实现的思路并不是很复杂。首先,有一个队列代表要执行的协程,每个协程对应一个Task结构。然后调度中心可以按照先进先出的方式不断地调度协程的执行。因为没有抢占机制,调度中心由协程自己驱动。协程需要主动让出CPU,将上下文切换回调度中心,以便调度中心进行下一轮调度。接下来我们看看,基于这些底层基础,如何实现一个基于协程的服务器。下面我们通过一个例子来说明。voidtaskmain(intargc,char**argv){//启动一个tcpserverif((fd=netannounce(TCP,0,atoi(argv[1])))<0){//...}//改为非阻塞模式fdnoblock(fd);//accept成功后创建客户端协程while((cfd=netaccept(fd,remote,&rport))>=0){taskcreate(proxytask,(void*)cfd,STACK);}}刚才我们说了taskmain就是我们要实现的函数。首先,通过netannounce创建一个tcpserver。然后把fd改成非阻塞的,这个很重要,因为后面调用accept的时候,如果是阻塞的文件描述符,会导致进程挂掉,非阻塞模式下,操作系统会返回错误EAGAIN代码,通过这个错误代码我们可以决定下一步要做什么。让我们看看netaccept的实现。//处理(移除)连接intnetaccept(intfd,char*server,int*port){intcfd,one;structsockaddr_insa;uchar*ip;socklen_tlen;//向epoll注册事件,等待事件触发fdwait(fd,'r');len=sizeofsa;//trigger表示有连接后,再执行acceptif((cfd=accept(fd,(void*)&sa,&len))<0){return-1;}//通信与客户端fd也改为非阻塞模式fdnoblock(cfd);one=1;setsockopt(cfd,IPPROTO_TCP,TCP_NODELAY,(char*)&one,sizeofone);returncfd;}netaccept是处理tcp连接一个通过一是通过调用accept,但是在accept之前,有一个很重要的操作fdwait。//协程需要切换,因为它在等待iovoidfdwait(intfd,intrw){//是epollif(!startedfdtask){startedfdtask=1;epfd=epoll_create(1);//如果没有初始化,就创建一个coroutineanddoioManagementtaskcreate(fdtask,0,32768);}structepoll_eventev={0};//记录事件对应的协程和感兴趣的事件ev.data.ptr=taskrunning;switch(rw){case'r':EV。events|=EPOLLIN|EPOLLPRI;break;case'w':ev.events|=EPOLLOUT;break;}intr=epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);//切换到其他协程,等待被唤醒taskswitch();//epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&ev);}fdwait先将fd注册到epoll中,然后切换协程到下一个要执行的协程。这里有一个细节,当协程X被调度执行时,它在就绪队列之外,而taskswitch函数只是实现了切换上下文到调度中心,调度中心会从就绪队列中选择下一个协程执行,那么此时已经离开就绪队列的协程X处于孤立状态,似乎已经不能被调度中心选中执行了。处理这个问题的方法是将协程、fd和感兴趣的事件信息一起注册到epoll中。当epoll监听到某个fd的事件时,会把对应的协程添加到就绪队列中,这样就可以调度协程执行了。epoll相关的逻辑是在fdwait函数的最开始处理的。epoll的逻辑也是在一个协程中执行的,但是epoll所在的协程不同于一般的协程,类似于操作系统的内核线程,epoll所在的协程变成了一个系统协程,也就是它不是用户定义的,而是系统定义的。我们来看看实现voidfdtask(void*v){inti,ms;Task*t;uvlongnow;//成为系统协程tasksystem();structepoll_eventevents[1000];for(;;){/*leteveryoneelserun*///如果大于0,说明还有其他准备好的协程可以执行,让他们先执行,否则执行while(taskyield()>0);/*we'retheonlyonerunnable-pollfori/o*/errno=0;//没有定时事件一直阻塞if((t=sleeping.head)==nil)ms=-1;else{/*sleepatmost5s*/now=nsec();if(now>=t->alarmtime)毫秒=0;elseif(now+5*1000*1000*1000LL>=t->alarmtime)ms=(t->alarmtime-now)/1000000;elsems=5000;}intnevents;//等待事件发生,ms为等待超时时间if((nevents=epoll_wait(epfd,events,1000,ms))<0){if(errno==EINTR)continue;fprint(2,"epoll:%s\n",strerror(errno));taskexitall(0);}/*wakeuptheguyswhodeserveit*///事件触发,将对应协程插入就绪队列for(i=0;i
