当前位置: 首页 > 后端技术 > PHP

Swoole源码分析——基础模块之Channelqueue

时间:2023-03-29 13:49:01 PHP

前言内存数据结构Channel,类似Go的chanchannel,底层基于共享内存+Mutex互斥锁实现,可以实现高性能的内存队列在用户模式下。通道可用于多进程环境。底层在读写时会自动加锁,应用层无需担心数据同步问题。该频道出现在上一篇文章中。当时是manager和worker进程通信的重要数据结构。主要用于worker进程通知manager进程重启相应的worker进程。channel数据结构channel数据结构有很多属性,head是队列的头部位置,tail是队列的尾部位置,size是申请的队列内存大小,maxlen是每个队列元素的大小,head_tag和tail_tag用于指定队列的头尾循环是否重置回头。bytes是当前通道队列占用的内存大小,flag用于指定是否使用共享内存,是否使用锁,是否使用管道通知。mem是通道的第一个内存地址。typedefstruct_swChannel_item{int长度;chardata[0];}swChannel_item;typedefstruct_swChannel{off_thead;关闭尾巴;size_t尺寸;字符头标签;字符尾标;整数;整数最大值;/***数据长度,不包括结构*/size_t字节;内部标志;int最大长度;/***内存点*/void*mem;swLock锁;swPipenotify_fd;}swChannel;channelqueueswChannel_new创建队列创建队列就是根据flags初始化队列的各个属性,值得注意maxlen,在申请内存的时候,会申请更多的内存,防止内存越界.swChannel*swChannel_new(size_tsize,intmaxlen,intflags){断言(size>=maxlen);诠释;无效*内存;//使用共享内存if(flags&SW_CHAN_SHM){mem=sw_shm_malloc(size+sizeof(swChannel)+maxlen);}else{mem=sw_malloc(size+sizeof(swChannel)+maxlen);}if(mem==NULL){swWarn("swChannel_create:malloc(%ld)failed.",size);返回空值;}swChannel*object=mem;mem+=sizeof(swChannel);bzero(对象,sizeof(swChannel));//溢出空间对象->size=size;对象->内存=内存;对象->maxlen=maxlen;对象->标志=标志;//使用锁if(flags&SW_CHAN_LOCK){//initlockif(swMutex_create(&object->lock,1)<0){swWarn("mutexinitfailed.");返回空值;}}//使用通知if(flags&SW_CHAN_NOTIFY){ret=swPipeNotify_自动(&object->notify_fd,1,1);if(ret<0){swWarn("notify_fd初始化失败。");返回空值;}}returnobject;}swChannel_push加锁,然后调用swChannel_inswChannel_in逻辑很简单,把数据推到队列尾部,如果当前通道的尾部重置了,头部还没有重置,需要判断剩余内存是否足够。如果当前通道的末尾还没有重置,可以放心添加元素,因为object->size之前还有maxlen空余和实际申请的内存,不用考虑内存的问题出界。intswChannel_push(swChannel*object,void*in,intdata_length){assert(object->flag&SW_CHAN_LOCK);object->lock.lock(&object->lock);intret=swChannel_in(object,in,data_length);object->lock.unlock(&object->lock);返回ret;}#defineswChannel_full(ch)((ch->head==ch->tail&&ch->tail_tag!=ch->head_tag)||(ch->bytes+sizeof(int)*ch->num==ch->size))intswChannel_in(swChannel*object,void*in,intdata_length){assert(data_length<=object->maxlen);如果(swChannel_full(对象)){返回SW_ERR;}swChannel_item*项目;intmsize=sizeof(item->length)+data_length;if(object->tailhead){//没有足够的内存空间if((object->head-object->tail)mem+object->tail;object->tail+=msize;}else{item=object->mem+object->tail;object->tail+=msize;if(object->tail>=object->size){object->tail=0;object->tail_tag=1-object->tail_tag;}}对象->num++;对象->字节+=数据长度;项目->长度=数据长度;memcpy(item->data,in,data_length);returnSW_OK;}swChannel_pushdequeueswChannel_pushdequeue逻辑比较简单。获取队列的头部位置,然后复制头部数据。当头部超过尺寸值时,可以重新设置头部。intswChannel_pop(swChannel*object,void*out,intbuffer_length){assert(object->flag&SW_CHAN_LOCK);object->lock.lock(&object->lock);intn=swChannel_out(object,out,buffer_length);object->lock.unlock(&object->lock);返回n;}#defineswChannel_empty(ch)(ch->num==0)intswChannel_out(swChannel*object,void*out,intbuffer_length){if(swChannel_empty(object)){returnSW_ERR;}swChannel_item*item=object->mem+object->head;断言(buffer_length>=item->length);memcpy(out,item->data,item->length);object->head+=(item->length+sizeof(item->length));if(object->head>=object->size){object->head=0;object->head_tag=1-object->head_tag;}object->num--;对象->字节-=项目->长度;返回项目->长度;}